Repository: incubator-reef Updated Branches: refs/heads/master 16f6c4b28 -> 22f651f8c
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java deleted file mode 100644 index b2e0083..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ /dev/null @@ -1,724 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.javabridge.generic; - -import org.apache.reef.driver.client.JobMessageObserver; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextMessage; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.*; -import org.apache.reef.driver.task.*; -import org.apache.reef.io.network.naming.NameServer; -import org.apache.reef.javabridge.*; -import org.apache.reef.runtime.common.DriverRestartCompleted; -import org.apache.reef.runtime.common.driver.DriverStatusManager; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.util.Optional; -import org.apache.reef.util.logging.CLRBufferedLogHandler; -import org.apache.reef.util.logging.LoggingScope; -import org.apache.reef.util.logging.LoggingScopeFactory; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.NetUtils; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.apache.reef.wake.time.Clock; -import org.apache.reef.wake.time.event.Alarm; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; -import org.apache.reef.webserver.*; - -import javax.inject.Inject; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Generic job driver for CLRBridge. - */ -@Unit -public final class JobDriver { - - private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); - /** - * String codec is used to encode the results - * before passing them back to the client. - */ - private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); - private final InteropLogger interopLogger = new InteropLogger(); - private final NameServer nameServer; - private final String nameServerInfo; - private final HttpServer httpServer; - /** - * Wake clock is used to schedule periodical job check-ups. - */ - private final Clock clock; - /** - * Job observer on the client. - * We use it to send results from the driver back to the client. - */ - private final JobMessageObserver jobMessageObserver; - /** - * Job driver uses EvaluatorRequestor - * to request Evaluators that will run the Tasks. - */ - private final EvaluatorRequestor evaluatorRequestor; - - /** - * Driver status manager to monitor driver status - */ - private final DriverStatusManager driverStatusManager; - - /** - * NativeInterop has function to load libs when driver starts - */ - private final LibLoader libLoader; - - /** - * Shell execution results from each Evaluator. - */ - private final List<String> results = new ArrayList<>(); - /** - * Map from context ID to running evaluator context. - */ - private final Map<String, ActiveContext> contexts = new HashMap<>(); - - /** - * Logging scope factory that provides LoggingScope - */ - private final LoggingScopeFactory loggingScopeFactory; - - private long evaluatorRequestorHandler = 0; - private long allocatedEvaluatorHandler = 0; - private long activeContextHandler = 0; - private long taskMessageHandler = 0; - private long failedTaskHandler = 0; - private long failedEvaluatorHandler = 0; - private long httpServerEventHandler = 0; - private long completedTaskHandler = 0; - private long runningTaskHandler = 0; - private long suspendedTaskHandler = 0; - private long completedEvaluatorHandler = 0; - private long closedContextHandler = 0; - private long failedContextHandler = 0; - private long contextMessageHandler = 0; - private long driverRestartHandler = 0; - private long driverRestartActiveContextHandler = 0; - private long driverRestartRunningTaskHandler = 0; - private boolean clrBridgeSetup = false; - private boolean isRestarted = false; - - /** - * Job driver constructor. - * All parameters are injected from TANG automatically. - * - * @param clock Wake clock to schedule and check up running jobs. - * @param jobMessageObserver is used to send messages back to the client. - * @param evaluatorRequestor is used to request Evaluators. - */ - @Inject - JobDriver(final Clock clock, - final HttpServer httpServer, - final NameServer nameServer, - final JobMessageObserver jobMessageObserver, - final EvaluatorRequestor evaluatorRequestor, - final DriverStatusManager driverStatusManager, - final LoggingScopeFactory loggingScopeFactory, - final LibLoader libLoader) { - this.clock = clock; - this.httpServer = httpServer; - this.jobMessageObserver = jobMessageObserver; - this.evaluatorRequestor = evaluatorRequestor; - this.nameServer = nameServer; - this.driverStatusManager = driverStatusManager; - this.nameServerInfo = NetUtils.getLocalAddress() + ":" + this.nameServer.getPort(); - this.loggingScopeFactory = loggingScopeFactory; - this.libLoader = libLoader; - } - - private void setupBridge(final StartTime startTime) { - // Signal to the clr buffered log handler that the driver has started and that - // we can begin logging - LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler..."); - try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) { - - try { - libLoader.loadLib(); - } catch (IOException e) { - throw new RuntimeException("Fail to load CLR libraries"); - } - - final CLRBufferedLogHandler handler = getCLRBufferedLogHandler(); - if (handler == null) { - LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized"); - } else { - handler.setDriverInitialized(); - LOG.log(Level.INFO, "CLRBufferedLogHandler init complete."); - } - - LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime}); - String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort())); - long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString(), portNumber); - if (handlers != null) { - if (handlers.length != NativeInterop.nHandlers) { - throw new RuntimeException( - String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers", - String.valueOf(handlers.length), - String.valueOf(NativeInterop.nHandlers))); - } - this.evaluatorRequestorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.EvaluatorRequestorKey)]; - this.allocatedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.AllocatedEvaluatorKey)]; - this.activeContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ActiveContextKey)]; - this.taskMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.TaskMessageKey)]; - this.failedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedTaskKey)]; - this.failedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedEvaluatorKey)]; - this.httpServerEventHandler = handlers[NativeInterop.Handlers.get(NativeInterop.HttpServerKey)]; - this.completedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedTaskKey)]; - this.runningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.RunningTaskKey)]; - this.suspendedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.SuspendedTaskKey)]; - this.completedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedEvaluatorKey)]; - this.closedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ClosedContextKey)]; - this.failedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedContextKey)]; - this.contextMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ContextMessageKey)]; - this.driverRestartHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartKey)]; - this.driverRestartActiveContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartActiveContextKey)]; - this.driverRestartRunningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartRunningTaskKey)]; - } - - try (final LoggingScope lp = this.loggingScopeFactory.getNewLoggingScope("setupBridge::ClrSystemHttpServerHandlerOnNext")) { - final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); - NativeInterop.ClrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, this.interopLogger); - final String specList = httpServerEventBridge.getUriSpecification(); - LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); - if (specList != null) { - final String[] specs = specList.split(":"); - for (final String s : specs) { - final HttpHandler h = new HttpServerBridgeEventHandler(); - h.setUriSpecification(s); - this.httpServer.addHttpHandler(h); - } - } - } - this.clrBridgeSetup = true; - } - LOG.log(Level.INFO, "CLR Bridge setup."); - } - - private CLRBufferedLogHandler getCLRBufferedLogHandler() { - for (Handler handler : Logger.getLogger("").getHandlers()) { - if (handler instanceof CLRBufferedLogHandler) - return (CLRBufferedLogHandler) handler; - } - return null; - } - - private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) { - synchronized (JobDriver.this) { - eval.setType(type); - LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", - new Object[]{eval.getId(), JobDriver.this.contexts.size()}); - if (JobDriver.this.allocatedEvaluatorHandler == 0) { - throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); - } - AllocatedEvaluatorBridge allocatedEvaluatorBridge = new AllocatedEvaluatorBridge(eval, JobDriver.this.nameServerInfo); - NativeInterop.ClrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler, allocatedEvaluatorBridge, this.interopLogger); - } - } - - /** - * Submit a Task to a single Evaluator. - */ - private void submit(final ActiveContext context) { - try { - LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); - if (JobDriver.this.activeContextHandler == 0) { - throw new RuntimeException("Active Context Handler not initialized by CLR."); - } - ActiveContextBridge activeContextBridge = new ActiveContextBridge(context); - NativeInterop.ClrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, JobDriver.this.interopLogger); - } catch (final Exception ex) { - LOG.log(Level.SEVERE, "Fail to submit task to active context"); - context.close(); - throw new RuntimeException(ex); - } - } - - /** - * Handles AllocatedEvaluator: Submit an empty context - */ - final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator allocatedEvaluator) { - try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); - JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR); - } - } - } - } - - /** - * Receive notification that a new Context is available. - */ - final class ActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", - new Object[]{context.getId()}); - JobDriver.this.contexts.put(context.getId(), context); - JobDriver.this.submit(context); - } - } - } - } - - /** - * Receive notification that the Task has completed successfully. - */ - final class CompletedTaskHandler implements EventHandler<CompletedTask> { - @Override - public void onNext(final CompletedTask task) { - LOG.log(Level.INFO, "Completed task: {0}", task.getId()); - try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) { - // Take the message returned by the task and add it to the running result. - String result = "default result"; - try { - result = new String(task.get()); - } catch (final Exception e) { - LOG.log(Level.WARNING, "failed to decode task outcome"); - } - LOG.log(Level.INFO, "Return results to the client:\n{0}", result); - JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); - if (JobDriver.this.completedTaskHandler == 0) { - LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); - } else { - LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); - CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task); - NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger); - } - } - } - } - - /** - * Receive notification that the entire Evaluator had failed. - */ - final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { - @Override - public void onNext(final FailedEvaluator eval) { - try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { - synchronized (JobDriver.this) { - LOG.log(Level.SEVERE, "FailedEvaluator", eval); - for (final FailedContext failedContext : eval.getFailedContextList()) { - String failedContextId = failedContext.getId(); - LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); - JobDriver.this.contexts.remove(failedContextId); - } - String message = "Evaluator " + eval.getId() + " failed with message: " - + eval.getEvaluatorException().getMessage(); - JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes()); - - if (failedEvaluatorHandler == 0) { - if (JobDriver.this.clrBridgeSetup) { - message = "No CLR FailedEvaluator handler was set, exiting now"; - LOG.log(Level.WARNING, message); - JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes()); - return; - } else { - clock.scheduleAlarm(0, new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm time) { - if (JobDriver.this.clrBridgeSetup) { - handleFailedEvaluatorInCLR(eval); - } else { - LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); - clock.scheduleAlarm(5000, this); - } - } - }); - } - } else { - handleFailedEvaluatorInCLR(eval); - } - } - } - } - - private void handleFailedEvaluatorInCLR(final FailedEvaluator eval) { - final String message = "CLR FailedEvaluator handler set, handling things with CLR handler."; - LOG.log(Level.INFO, message); - FailedEvaluatorBridge failedEvaluatorBridge = new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, JobDriver.this.isRestarted, loggingScopeFactory); - NativeInterop.ClrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger); - int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber(); - if (additionalRequestedEvaluatorNumber > 0) { - LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + additionalRequestedEvaluatorNumber); - } - JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes()); - } - } - - final class HttpServerBridgeEventHandler implements HttpHandler { - private String uriSpecification; - - /** - * returns URI specification for the handler - */ - @Override - public String getUriSpecification() { - return uriSpecification; - } - - public void setUriSpecification(String s) { - uriSpecification = s; - } - - /** - * process http request - */ - @Override - public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException { - LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri()); - try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) { - final AvroHttpSerializer httpSerializer = new AvroHttpSerializer(); - final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest); - final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest); - - try { - final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); - NativeInterop.ClrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, JobDriver.this.interopLogger); - final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); - response.getWriter().println(responseBody); - LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); - } catch (final Exception ex) { - LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex); - throw new RuntimeException(ex); - } - } - } - } - - /** - * Handle failed task. - */ - final class FailedTaskHandler implements EventHandler<FailedTask> { - @Override - public void onNext(final FailedTask task) throws RuntimeException { - LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); - if (JobDriver.this.failedTaskHandler == 0) { - LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); - throw new RuntimeException("Failed Task Handler not initialized by CLR."); - } - try { - FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task); - NativeInterop.ClrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, JobDriver.this.interopLogger); - } catch (final Exception ex) { - LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); - throw new RuntimeException(ex); - } - } - } - - /** - * Receive notification that the Task is running. - */ - final class RunningTaskHandler implements EventHandler<RunningTask> { - @Override - public void onNext(final RunningTask task) { - try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { - if (JobDriver.this.runningTaskHandler == 0) { - LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); - } else { - LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); - try { - final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task); - NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger); - } catch (final Exception ex) { - LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); - throw new RuntimeException(ex); - } - } - } - } - } - - /** - * Receive notification that the Task is running when driver restarted. - */ - final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> { - @Override - public void onNext(final RunningTask task) { - try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) { - clock.scheduleAlarm(0, new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm time) { - if (JobDriver.this.clrBridgeSetup) { - if (JobDriver.this.driverRestartRunningTaskHandler != 0) { - LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); - NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task)); - } else { - LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler."); - } - } else { - LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart RunningTaskHandler..."); - clock.scheduleAlarm(2000, this); - } - } - }); - } - } - } - - /** - * Receive notification that an context is active on Evaluator when the driver restarted - */ - final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) { - JobDriver.this.contexts.put(context.getId(), context); - LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId()); - clock.scheduleAlarm(0, new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm time) { - if (JobDriver.this.clrBridgeSetup) { - if (JobDriver.this.driverRestartActiveContextHandler != 0) { - LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); - NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context)); - } else { - LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler."); - } - } else { - LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart DriverRestartActiveContextHandler..."); - clock.scheduleAlarm(2000, this); - } - } - }); - } - } - } - - /** - * Job Driver is ready and the clock is set up: request the evaluators. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) { - synchronized (JobDriver.this) { - - setupBridge(startTime); - - LOG.log(Level.INFO, "Driver Started"); - - if (JobDriver.this.evaluatorRequestorHandler == 0) { - throw new RuntimeException("Evaluator Requestor Handler not initialized by CLR."); - } - EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); - NativeInterop.ClrSystemEvaluatorRequstorHandlerOnNext(JobDriver.this.evaluatorRequestorHandler, evaluatorRequestorBridge, JobDriver.this.interopLogger); - // get the evaluator numbers set by CLR handler - LOG.log(Level.INFO, "evaluator requested at start up: " + evaluatorRequestorBridge.getEvaluatorNumber()); - } - } - } - } - - - /** - * Job driver is restarted after previous crash - */ - final class RestartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - try (final LoggingScope ls = loggingScopeFactory.driverRestart(startTime)) { - synchronized (JobDriver.this) { - - setupBridge(startTime); - - JobDriver.this.isRestarted = true; - - LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up."); - } - } - } - } - - /** - * Receive notification that driver restart has completed. - */ - final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> { - @Override - public void onNext(final DriverRestartCompleted driverRestartCompleted) { - LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", driverRestartCompleted.getTimeStamp()); - try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(driverRestartCompleted.getTimeStamp())) { - if (JobDriver.this.driverRestartHandler != 0) { - LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); - NativeInterop.ClrSystemDriverRestartHandlerOnNext(JobDriver.this.driverRestartHandler); - } else { - LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); - - } - } - } - } - - /** - * Shutting down the job driver: close the evaluators. - */ - final class StopHandler implements EventHandler<StopTime> { - @Override - public void onNext(final StopTime time) { - LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); - try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) { - for (final ActiveContext context : contexts.values()) { - context.close(); - } - } - } - } - - final class TaskMessageHandler implements EventHandler<TaskMessage> { - @Override - public void onNext(final TaskMessage taskMessage) { - String msg = new String(taskMessage.get()); - LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); - //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { - if (JobDriver.this.taskMessageHandler != 0) { - TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); - // if CLR implements the task message handler, handle the bytes in CLR handler - NativeInterop.ClrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger); - } - //} - } - } - - /** - * Receive notification that the Task has been suspended. - */ - final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { - @Override - public final void onNext(final SuspendedTask task) { - final String message = "Received notification that task [" + task.getId() + "] has been suspended."; - LOG.log(Level.INFO, message); - try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { - if (JobDriver.this.suspendedTaskHandler != 0) { - SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task); - // if CLR implements the suspended task handler, handle it in CLR - LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); - NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge); - } - JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); - } - } - } - - /** - * Receive notification that the Evaluator has been shut down. - */ - final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { - @Override - public void onNext(final CompletedEvaluator evaluator) { - LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId()); - try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) { - if (JobDriver.this.completedEvaluatorHandler != 0) { - CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator); - // if CLR implements the completed evaluator handler, handle it in CLR - LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge."); - NativeInterop.ClrSystemCompletdEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge); - } - } - } - } - - - /** - * Receive notification that the Context had completed. - * Remove context from the list of active context. - */ - final class ClosedContextHandler implements EventHandler<ClosedContext> { - @Override - public void onNext(final ClosedContext context) { - LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); - try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { - if (JobDriver.this.closedContextHandler != 0) { - ClosedContextBridge closedContextBridge = new ClosedContextBridge(context); - // if CLR implements the closed context handler, handle it in CLR - LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); - NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge); - } - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } - } - } - } - - - /** - * Receive notification that the Context had failed. - * Remove context from the list of active context and notify the client. - */ - final class FailedContextHandler implements EventHandler<FailedContext> { - @Override - public void onNext(final FailedContext context) { - LOG.log(Level.SEVERE, "FailedContext", context); - try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { - if (JobDriver.this.failedContextHandler != 0) { - FailedContextBridge failedContextBridge = new FailedContextBridge(context); - // if CLR implements the failed context handler, handle it in CLR - LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); - NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge); - } - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } - Optional<byte[]> err = context.getData(); - if (err.isPresent()) { - JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); - } - } - } - } - - /** - * Receive notification that a ContextMessage has been received - */ - final class ContextMessageHandler implements EventHandler<ContextMessage> { - @Override - public void onNext(final ContextMessage message) { - LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); - try (final LoggingScope ls = loggingScopeFactory.contextMessageReceived(message.get().toString())) { - if (JobDriver.this.contextMessageHandler != 0) { - ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); - // if CLR implements the context message handler, handle it in CLR - LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); - NativeInterop.ClrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, contextMessageBridge); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java deleted file mode 100644 index b1473ee..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.javabridge.generic; - -import org.apache.reef.client.ClientConfiguration; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.JavaConfigurationBuilder; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.CommandLine; -import org.apache.reef.util.logging.LoggingScope; -import org.apache.reef.util.logging.LoggingScopeFactory; -import org.apache.reef.util.logging.LoggingScopeImpl; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Date; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Clr Bridge example - main class. - */ -public final class Launch { - - /** - * Number of REEF worker threads in local mode. We assume maximum 10 evaluators can be requested on local runtime - */ - private static final int NUM_LOCAL_THREADS = 10; - /** - * Standard Java logger - */ - private static final Logger LOG = Logger.getLogger(Launch.class.getName()); - - /** - * This class should not be instantiated. - */ - private Launch() { - throw new RuntimeException("Do not instantiate this class!"); - } - - /** - * Parse the command line arguments. - * - * @param args command line arguments, as passed to main() - * @return Configuration object. - * @throws org.apache.reef.tang.exceptions.BindException configuration error. - * @throws java.io.IOException error reading the configuration. - */ - private static Configuration parseCommandLine(final String[] args) - throws BindException, IOException { - final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); - final CommandLine cl = new CommandLine(confBuilder); - cl.registerShortNameOfClass(Local.class); - cl.registerShortNameOfClass(NumRuns.class); - cl.registerShortNameOfClass(WaitTimeForDriver.class); - cl.registerShortNameOfClass(DriverMemoryInMb.class); - cl.registerShortNameOfClass(DriverIdentifier.class); - cl.registerShortNameOfClass(DriverJobSubmissionDirectory.class); - cl.registerShortNameOfClass(Submit.class); - cl.processCommandLine(args); - return confBuilder.build(); - } - - private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf) - throws InjectionException, BindException { - final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); - final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); - cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class))); - return cb.build(); - } - - /** - * Parse command line arguments and create TANG configuration ready to be submitted to REEF. - * - * @param args Command line arguments, as passed into main(). - * @return (immutable) TANG Configuration object. - * @throws org.apache.reef.tang.exceptions.BindException if configuration commandLineInjector fails. - * @throws org.apache.reef.tang.exceptions.InjectionException if configuration commandLineInjector fails. - * @throws java.io.IOException error reading the configuration. - */ - private static Configuration getClientConfiguration(final String[] args) - throws BindException, InjectionException, IOException { - - try (final LoggingScope ls = LoggingScopeFactory.getNewLoggingScope(Level.INFO, "Launch::getClientConfiguration")) { - final Configuration commandLineConf = parseCommandLine(args); - - final Configuration clientConfiguration = ClientConfiguration.CONF - .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class) - .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class) - .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class) - //.set(ClientConfiguration.ON_WAKE_ERROR, JobClient.WakeErrorHandler.class ) - .build(); - - // TODO: Remove the injector, have stuff injected. - final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf); - final boolean isLocal = commandLineInjector.getNamedInstance(Local.class); - final Configuration runtimeConfiguration; - if (isLocal) { - LOG.log(Level.INFO, "Running on the local runtime"); - runtimeConfiguration = LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS) - .build(); - } else { - LOG.log(Level.INFO, "Running on YARN"); - runtimeConfiguration = YarnClientConfiguration.CONF.build(); - } - - return Tang.Factory.getTang() - .newConfigurationBuilder(runtimeConfiguration, clientConfiguration, - cloneCommandLineConfiguration(commandLineConf)) - .build(); - } - } - - /** - * Main method that starts the CLR Bridge from Java - * - * @param args command line parameters. - */ - public static void main(final String[] args) { - LOG.log(Level.INFO, "Entering Launch at :::" + new Date()); - try { - if (args == null || args.length == 0) { - throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied."); - } - final File dotNetFolder = new File(args[0]).getAbsoluteFile(); - String[] removedArgs = Arrays.copyOfRange(args, 1, args.length); - - final Configuration config = getClientConfiguration(removedArgs); - final Injector commandLineInjector = Tang.Factory.getTang().newInjector(parseCommandLine(removedArgs)); - final int waitTime = commandLineInjector.getNamedInstance(WaitTimeForDriver.class); - final int driverMemory = commandLineInjector.getNamedInstance(DriverMemoryInMb.class); - final String driverIdentifier = commandLineInjector.getNamedInstance(DriverIdentifier.class); - final String jobSubmissionDirectory = commandLineInjector.getNamedInstance(DriverJobSubmissionDirectory.class); - final boolean submit = commandLineInjector.getNamedInstance(Submit.class); - final Injector injector = Tang.Factory.getTang().newInjector(config); - final JobClient client = injector.getInstance(JobClient.class); - client.setDriverInfo(driverIdentifier, driverMemory, jobSubmissionDirectory); - - if (submit) { - client.submit(dotNetFolder, true, null); - client.waitForCompletion(waitTime); - } else { - client.submit(dotNetFolder, false, config); - client.waitForCompletion(0); - } - - - LOG.info("Done!"); - } catch (final BindException | InjectionException | IOException ex) { - LOG.log(Level.SEVERE, "Job configuration error", ex); - } - } - - /** - * Command line parameter: number of experiments to run. - */ - @NamedParameter(doc = "Number of times to run the command", - short_name = "num_runs", default_value = "1") - public static final class NumRuns implements Name<Integer> { - } - - /** - * Command line parameter = true to run locally, or false to run on YARN. - */ - @NamedParameter(doc = "Whether or not to run on the local runtime", - short_name = "local", default_value = "true") - public static final class Local implements Name<Boolean> { - } - - /** - * Command line parameter, number of seconds to wait till driver finishes , - * = -1 : waits forever - * = 0: exit immediately without wait for driver. - */ - @NamedParameter(doc = "Whether or not to wait for driver to finish", - short_name = "wait_time", default_value = "-1") - public static final class WaitTimeForDriver implements Name<Integer> { - } - - /** - * Command line parameter, driver memory, in MB - */ - @NamedParameter(doc = "memory allocated to driver JVM", - short_name = "driver_memory", default_value = "512") - public static final class DriverMemoryInMb implements Name<Integer> { - } - - /** - * Command line parameter, driver identifier - */ - @NamedParameter(doc = "driver identifier for clr bridge", - short_name = "driver_id", default_value = "ReefClrBridge") - public static final class DriverIdentifier implements Name<String> { - } - - /** - * Command line parameter = true to submit the job with driver config, or false to write config to current directory - */ - @NamedParameter(doc = "Whether or not to submit the reef job after driver config is constructed", - short_name = "submit", default_value = "true") - public static final class Submit implements Name<Boolean> { - } - - /** - * Command line parameter, job submission directory, if set, user should guarantee its uniqueness - */ - @NamedParameter(doc = "driver job submission directory", - short_name = "submission_directory", default_value = "empty") - public static final class DriverJobSubmissionDirectory implements Name<String> { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java deleted file mode 100644 index ba2a5cb..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.javabridge.generic; - -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.REEF; -import org.apache.reef.runtime.common.client.REEFImplementation; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Configurations; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.ConfigurationModule; - -import java.io.File; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Clr Bridge example - main class. - */ -public final class LaunchHeadless { - - /** - * Standard Java logger - */ - private static final Logger LOG = Logger.getLogger(LaunchHeadless.class.getName()); - - /** - * This class should not be instantiated. - */ - private LaunchHeadless() { - throw new RuntimeException("Do not instantiate this class!"); - } - - - /** - * Parse command line arguments and create TANG configuration ready to be submitted to REEF. - * - * @param args Command line arguments, as passed into main(). - * @return (immutable) TANG Configuration object. - * @throws org.apache.reef.tang.exceptions.BindException if configuration commandLineInjector fails. - * @throws org.apache.reef.tang.exceptions.InjectionException if configuration commandLineInjector fails. - * @throws java.io.IOException error reading the configuration. - */ - - /** - * Main method that starts the CLR Bridge from Java - * - * @param args command line parameters. - */ - public static void main(final String[] args) { - try { - if (args == null || args.length == 0) { - throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied."); - } - final File dotNetFolder = new File(args[0]).getAbsoluteFile(); - - ConfigurationModule driverConfigModule = JobClient.getDriverConfiguration(); - - ConfigurationModule result = driverConfigModule; - for (final File f : dotNetFolder.listFiles()) { - if (f.canRead() && f.exists() && f.isFile()) { - result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath()); - } - } - - driverConfigModule = result; - Configuration driverConfiguration = Configurations.merge(driverConfigModule.build(), JobClient.getHTTPConfiguration()); - - LOG.log(Level.INFO, "Running on YARN"); - - final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build(); - - final REEF reef = Tang.Factory.getTang().newInjector(runtimeConfiguration).getInstance(REEFImplementation.class); - reef.submit(driverConfiguration); - - LOG.info("Done!"); - } catch (final BindException | InjectionException ex) { - LOG.log(Level.SEVERE, "Job configuration error", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java deleted file mode 100644 index d93f6f4..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Generic java bridge driver/client - */ -package org.apache.reef.javabridge.generic; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java deleted file mode 100644 index 46629c9..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.util.logging; - -import org.apache.reef.javabridge.NativeInterop; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; -import java.util.logging.SimpleFormatter; - -/** - * Logging Handler to intercept java logs and transfer them - * to the CLR side via the reef-bridge. - * <p/> - * Logs are buffered to avoid the cost of reef-bridge function calls. - * A thread is also scheduled to flush the log buffer at a certain interval, - * in case the log buffer remains unfilled for an extended period of time. - */ -public class CLRBufferedLogHandler extends Handler { - private static final int BUFFER_LEN = 10; - private static final int NUM_THREADS = 1; - private static final long LOG_SCHEDULE_PERIOD = 15; // seconds - private SimpleFormatter formatter; - private ArrayList<LogRecord> logs; - private boolean driverInitialized; - private ScheduledThreadPoolExecutor logScheduler; - - @Inject - public CLRBufferedLogHandler() { - super(); - this.formatter = new SimpleFormatter(); - this.logs = new ArrayList<LogRecord>(); - this.driverInitialized = false; - this.logScheduler = new ScheduledThreadPoolExecutor(NUM_THREADS); - } - - /** - * Signals the java-bridge has been initialized and that we can begin logging. - * Usually called from the StartHandler after the driver is up. - */ - public void setDriverInitialized() { - synchronized (this) { - this.driverInitialized = true; - } - startLogScheduler(); - } - - /** - * Called whenever a log message is received on the java side. - * <p/> - * Adds the log record to the log buffer. If the log buffer is full and - * the driver has already been initialized, flush the buffer of the logs. - */ - @Override - public void publish(LogRecord record) { - if (record == null) - return; - - if (!isLoggable(record)) - return; - - synchronized (this) { - this.logs.add(record); - if (!this.driverInitialized || this.logs.size() < BUFFER_LEN) - return; - } - - logAll(); - } - - @Override - public void flush() { - logAll(); - } - - /** - * Flushes the remaining buffered logs and shuts down the log scheduler thread. - */ - @Override - public synchronized void close() throws SecurityException { - if (driverInitialized) { - this.logAll(); - } - this.logScheduler.shutdown(); - } - - /** - * Starts a thread to flush the log buffer on an interval. - * <p/> - * This will ensure that logs get flushed periodically, even - * if the log buffer is not full. - */ - private void startLogScheduler() { - this.logScheduler.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - CLRBufferedLogHandler.this.logAll(); - } - }, 0, LOG_SCHEDULE_PERIOD, TimeUnit.SECONDS); - } - - /** - * Flushes the log buffer, logging each buffered log message using - * the reef-bridge log function. - */ - private void logAll() { - synchronized (this) { - final StringBuilder sb = new StringBuilder(); - Level highestLevel = Level.FINEST; - for (final LogRecord record : this.logs) { - sb.append(formatter.format(record)); - sb.append("\n"); - if (record.getLevel().intValue() > highestLevel.intValue()) { - highestLevel = record.getLevel(); - } - } - try { - final int level = getLevel(highestLevel); - NativeInterop.ClrBufferedLog(level, sb.toString()); - } catch (Exception e) { - System.err.println("Failed to perform CLRBufferedLogHandler"); - } - - this.logs.clear(); - } - } - - /** - * Returns the integer value of the log record's level to be used - * by the CLR Bridge log function. - */ - private int getLevel(Level recordLevel) { - if (recordLevel.equals(Level.OFF)) { - return 0; - } else if (recordLevel.equals(Level.SEVERE)) { - return 1; - } else if (recordLevel.equals(Level.WARNING)) { - return 2; - } else if (recordLevel.equals(Level.ALL)) { - return 4; - } else { - return 3; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java deleted file mode 100644 index 7d82937..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.util.logging; - -import java.io.IOException; -import java.util.logging.LogManager; - -public final class CLRLoggingConfig { - - public CLRLoggingConfig() throws IOException { - LogManager.getLogManager().readConfiguration( - Thread.currentThread().getContextClassLoader() - .getResourceAsStream("com/microsoft/reef/clr.logging.properties")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java deleted file mode 100644 index e0e79ce..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Logging handler for clr bridge - */ -package org.apache.reef.util.logging; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties b/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties deleted file mode 100644 index 41c4024..0000000 --- a/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties +++ /dev/null @@ -1,82 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Properties file which configures the operation of the JDK -# logging facility. - -# The system will look for this config file, first using -# a System property specified at startup: -# -# >java -Djava.utils.logging.config.file=myLoggingConfigFilePath -# -# If this property is not specified, then the config file is -# retrieved from its default location at: -# -# JDK_HOME/jre/lib/logging.properties - -# Global logging properties. -# ------------------------------------------ -# The set of handlers to be loaded upon startup. -# Comma-separated list of class names. -# (? LogManager docs say no comma here, but JDK example has comma.) -# handlers=java.utils.logging.FileHandler, java.utils.logging.ConsoleHandler -handlers=java.util.logging.ConsoleHandler,org.apache.reef.util.logging.CLRBufferedLogHandler - -java.util.logging.SimpleFormatter.format=%1$tF %1$tT,%1$tL %4$s %2$s - %5$s%6$s%n - -# Default global logging level. -# Loggers and Handlers may override this level -.level=ALL - -# Loggers -# ------------------------------------------ -# Loggers are usually attached to packages. -# Here, the level for each package is specified. -# The global level is used by default, so levels -# specified here simply act as an override. - -# org.apache.reef.examples.level=FINEST -# org.apache.reef.tang.level=INFO - -# Handlers -# ----------------------------------------- - -# --- ConsoleHandler --- -# Override of global logging level -java.util.logging.ConsoleHandler.level=FINEST -java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter - -# --- FileHandler --- -# Override of global logging level -java.util.logging.FileHandler.level=FINEST - -# Naming style for the output file: -# (The output file is placed in the directory -# defined by the "user.home" System property.) -java.util.logging.FileHandler.pattern=%h/reef.%u.log - -# Limiting size of output file in bytes: -java.util.logging.FileHandler.limit=512000 - -# Number of output files to cycle through, by appending an -# integer to the base file name: -java.util.logging.FileHandler.count=100 - -# Style of output (Simple or XML): -java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-project/reef-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-project/reef-bridge/pom.xml b/lang/java/reef-bridge-project/reef-bridge/pom.xml deleted file mode 100644 index 774e5f8..0000000 --- a/lang/java/reef-bridge-project/reef-bridge/pom.xml +++ /dev/null @@ -1,111 +0,0 @@ -<?xml version="1.0"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <artifactId>reef-bridge</artifactId> - <name>REEF Bridge</name> - <description>Bridge between JVM and CLR.</description> - - - <parent> - <groupId>org.apache.reef</groupId> - <artifactId>reef-bridge-project</artifactId> - <version>0.11.0-incubating-SNAPSHOT</version> - </parent> - - - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-runtime-local</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-runtime-yarn</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-io</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-checkpoint</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-bridge-java</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-bridge-clr</artifactId> - <version>${project.version}</version> - </dependency> - - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>unpack-dependencies</id> - <phase>process-resources</phase> - <goals> - <goal>unpack-dependencies</goal> - </goals> - - <configuration> - <includeArtifactIds>reef-bridge-java,reef-bridge-clr</includeArtifactIds> - <outputDirectory> - ${project.build.directory}/classes/ReefDriverAppDlls - </outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <archive> - <manifest> - <addClasspath>false</addClasspath> - <classpathPrefix>lib/</classpathPrefix> - <mainClass>org.apache.reef.javabridge.JavaBridge</mainClass> - </manifest> - </archive> - </configuration> - </plugin> - - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/reef-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/lang/reef-bridge/pom.xml b/lang/reef-bridge/pom.xml new file mode 100644 index 0000000..9c13086 --- /dev/null +++ b/lang/reef-bridge/pom.xml @@ -0,0 +1,139 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>reef-bridge</artifactId> + <name>REEF Bridge</name> + <description>Bridge between JVM and CLR.</description> + + <parent> + <groupId>org.apache.reef</groupId> + <artifactId>reef-project</artifactId> + <version>0.11.0-incubating-SNAPSHOT</version> + <relativePath>../..</relativePath> + </parent> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-yarn</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-io</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-checkpoint</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-bridge-java</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-bridge-clr</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>unpack-dependencies</id> + <phase>process-resources</phase> + <goals> + <goal>unpack-dependencies</goal> + </goals> + + <configuration> + <includeArtifactIds>reef-bridge-java,reef-bridge-clr</includeArtifactIds> + <outputDirectory> + ${project.build.directory}/classes/ReefDriverAppDlls + </outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <addClasspath>false</addClasspath> + <classpathPrefix>lib/</classpathPrefix> + <mainClass>org.apache.reef.javabridge.JavaBridge</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + <configuration> + <outputFile> + ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar + </outputFile> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>yarn-default.xml</exclude> + <exclude>yarn-version-info.properties</exclude> + <exclude>core-default.xml</exclude> + <exclude>LICENSE</exclude> + <exclude>META-INF/*</exclude> + </excludes> + </filter> + </filters> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index df5bb0f..ab885f9 100644 --- a/pom.xml +++ b/pom.xml @@ -615,7 +615,9 @@ under the License. <modules> <module>lang/java/reef-annotations</module> - <module>lang/java/reef-bridge-project</module> + <module>lang/java/reef-bridge-java</module> + <module>lang/cpp/reef-bridge-clr</module> + <module>lang/reef-bridge</module> <module>lang/java/reef-checkpoint</module> <module>lang/java/reef-common</module> <module>lang/java/reef-examples</module> @@ -665,7 +667,7 @@ under the License. </os> </activation> <modules> - <module>lang/java/reef-bridge-project</module> + <module>lang/reef-bridge</module> </modules> </profile>
