[REEF-65] Replace Retained Evaluators example with the Scheduler This removes the Retained Evaluators example.
JIRA: [REEF-65](https://issues.apache.org/jira/browse/REEF-65) Pull Request: This closes #163 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b9f38b25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b9f38b25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b9f38b25 Branch: refs/heads/master Commit: b9f38b25d4253530d7f72c8178f1424beb8b6d28 Parents: b2e85d1 Author: Yunseong Lee <[email protected]> Authored: Sat Apr 25 21:27:47 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Tue Apr 28 13:06:55 2015 -0700 ---------------------------------------------------------------------- bin/run.sh | 2 +- .../Org.Apache.REEF.Examples.csproj | 6 +- .../RetainedEvalActiveContextHandler.cs | 56 --- .../RetainedEvalAllocatedEvaluatorHandler.cs | 48 -- .../RetainedEvalEvaluatorRequestorHandler.cs | 47 -- .../Handlers/RetainedEvalStartHandler.cs | 89 ---- lang/java/reef-examples-clr/pom.xml | 35 -- .../examples/retained_evalCLR/JobClient.java | 317 ------------ .../examples/retained_evalCLR/JobDriver.java | 489 ------------------- .../reef/examples/retained_evalCLR/Launch.java | 189 ------- .../examples/retained_evalCLR/package-info.java | 22 - lang/java/reef-examples/pom.xml | 59 --- .../reef/examples/retained_eval/JobClient.java | 335 ------------- .../reef/examples/retained_eval/JobDriver.java | 370 -------------- .../reef/examples/retained_eval/Launch.java | 185 ------- .../examples/retained_eval/package-info.java | 22 - .../examples/scheduler/SchedulerDriver.java | 6 +- .../reef/tests/examples/ExamplesTestSuite.java | 3 +- .../tests/examples/TestRetainedEvaluators.java | 81 --- 19 files changed, 6 insertions(+), 2355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/bin/run.sh ---------------------------------------------------------------------- diff --git a/bin/run.sh b/bin/run.sh index 21ae5fc..b5bb0a4 100755 --- a/bin/run.sh +++ b/bin/run.sh @@ -19,7 +19,7 @@ # # EXAMPLE USAGE -# ./run.sh org.apache.reef.examples.retained_eval.Launch -num_eval 2 -local false +# ./run.sh org.apache.reef.examples.hello.HelloREEF # RUNTIME SELF_JAR=`echo $REEF_HOME/reef-examples/target/reef-examples-*-shaded.jar` http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj index abed026..1e342ed 100644 --- a/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj +++ b/lang/cs/Org.Apache.REEF.Examples/Org.Apache.REEF.Examples.csproj @@ -69,10 +69,6 @@ under the License. <Compile Include="MachineLearning\KMeans\LegacyKMeansTask.cs" /> <Compile Include="MachineLearning\KMeans\PartialMean.cs" /> <Compile Include="MachineLearning\KMeans\ProcessedResults.cs" /> - <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalActiveContextHandler.cs" /> - <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalAllocatedEvaluatorHandler.cs" /> - <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalEvaluatorRequestorHandler.cs" /> - <Compile Include="RetainedEvalCLRBridge\Handlers\RetainedEvalStartHandler.cs" /> <Compile Include="Tasks\FailedTask\FailedTask.cs" /> <Compile Include="Tasks\HelloTask\HelloService.cs" /> <Compile Include="Tasks\HelloTask\HelloTask.cs" /> @@ -117,4 +113,4 @@ under the License. <Target Name="AfterBuild"> </Target> --> -</Project> \ No newline at end of file +</Project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs deleted file mode 100644 index 5961e85..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalActiveContextHandler.cs +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Examples.Tasks.ShellTask; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; - -namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers -{ - public class RetainedEvalActiveContextHandler : IObserver<IActiveContext> - { - public void OnNext(IActiveContext activeContext) - { - ICsConfigurationBuilder cb = TangFactory.GetTang().NewConfigurationBuilder(); - cb.AddConfiguration(TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, "bridgeCLRShellTask_" + DateTime.Now.Ticks) - .Set(TaskConfiguration.Task, GenericType<ShellTask>.Class) - .Build()); - cb.BindNamedParameter<ShellTask.Command, string>(GenericType<ShellTask.Command>.Class, "echo"); - - IConfiguration taskConfiguration = cb.Build(); - - activeContext.SubmitTask(taskConfiguration); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs deleted file mode 100644 index c591382..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalAllocatedEvaluatorHandler.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Interface; - -namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers -{ - public class RetainedEvalAllocatedEvaluatorHandler : IObserver<IAllocatedEvaluator> - { - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - IConfiguration contextConfiguration = ContextConfiguration.ConfigurationModule - .Set(ContextConfiguration.Identifier, "RetainedEvalCLRBridgeContextId") - .Build(); - - allocatedEvaluator.SubmitContext(contextConfiguration); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs deleted file mode 100644 index 172cd4e..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalEvaluatorRequestorHandler.cs +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Driver.Evaluator; - -namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers -{ - public class RetainedEvalEvaluatorRequestorHandler : IObserver<IEvaluatorRequestor> - { - public void OnNext(IEvaluatorRequestor requestor) - { - int evaluatorsNumber = 1; - int memory = 512; - string rack = "WonderlandRack"; - EvaluatorRequest request = new EvaluatorRequest(evaluatorsNumber, memory, rack); - - requestor.Submit(request); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs b/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs deleted file mode 100644 index cc2448e..0000000 --- a/lang/cs/Org.Apache.REEF.Examples/RetainedEvalCLRBridge/Handlers/RetainedEvalStartHandler.cs +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Collections.Generic; -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Examples.Tasks.ShellTask; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Examples.RetainedEvalCLRBridge.Handlers -{ - public class RetainedEvalStartHandler : IStartHandler - { - private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorHandler; - private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorHandler; - private static ClrSystemHandler<IActiveContext> _activeContextHandler; - - [Inject] - public RetainedEvalStartHandler() - { - CreateClassHierarchy(); - Identifier = "RetainedEvalStartHandler"; - } - - public RetainedEvalStartHandler(string id) - { - Identifier = id; - CreateClassHierarchy(); - } - - public string Identifier { get; set; } - - public IList<ulong> GetHandlers() - { - ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray(); - - // initiate Evaluator Requestor handler - _evaluatorRequestorHandler = new ClrSystemHandler<IEvaluatorRequestor>(); - handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorHandler); - Console.WriteLine("_evaluatorRequestorHandler initiated"); - _evaluatorRequestorHandler.Subscribe(new RetainedEvalEvaluatorRequestorHandler()); - - // initiate Allocated Evaluator handler - _allocatedEvaluatorHandler = new ClrSystemHandler<IAllocatedEvaluator>(); - handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorHandler); - Console.WriteLine("_allocatedEvaluatorHandler initiated"); - _allocatedEvaluatorHandler.Subscribe(new RetainedEvalAllocatedEvaluatorHandler()); - - // initiate Active Context handler - _activeContextHandler = new ClrSystemHandler<IActiveContext>(); - handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextHandler); - Console.WriteLine("_activeContextHandler initiated"); - _activeContextHandler.Subscribe(new RetainedEvalActiveContextHandler()); - - return handlers; - } - - private void CreateClassHierarchy() - { - HashSet<string> clrDlls = new HashSet<string>(); - clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); - clrDlls.Add(typeof(ITask).Assembly.GetName().Name); - clrDlls.Add(typeof(ShellTask).Assembly.GetName().Name); - - ClrHandlerHelper.GenerateClassHierarchy(clrDlls); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples-clr/pom.xml b/lang/java/reef-examples-clr/pom.xml index 2cb2171..33e5c6a 100644 --- a/lang/java/reef-examples-clr/pom.xml +++ b/lang/java/reef-examples-clr/pom.xml @@ -116,42 +116,7 @@ under the License. </plugins> </build> - <profiles> - - - <profile> - <id>RetainedEvalCLR</id> - <build> - <defaultGoal>exec:exec</defaultGoal> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <configuration> - <executable>java</executable> - <arguments> - <argument>-classpath</argument> - <classpath/> - <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config - </argument> - <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory} - </argument> - <argument>org.apache.reef.examples.retained_evalCLR.Launch</argument> - <argument>dotnetDistributedShell</argument> - <!-- <argument>-cmd</argument> - <argument>date</argument> - <argument>-num_runs</argument> - <argument>20</argument> - <argument>-local</argument> - <argument>true</argument> --> - </arguments> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> <id>HelloCLR</id> <build> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java deleted file mode 100644 index 8823c87..0000000 --- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobClient.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.retained_evalCLR; - -import org.apache.reef.client.*; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; - -import javax.inject.Inject; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Retained Evaluator Shell Client. - */ -@Unit -public class JobClient { - - /** - * Standard java logger. - */ - private static final Logger LOG = Logger.getLogger(JobClient.class.getName()); - - /** - * Codec to translate messages to and from the job driver - */ - private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); - - /** - * Reference to the REEF framework. - * This variable is injected automatically in the constructor. - */ - private final REEF reef; - - /** - * Shell command to submitTask to the job driver. - */ - private final String command; - /** - * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode. - */ - private final boolean isInteractive; - /** - * Total number of experiments to run. - */ - private final int maxRuns; - /** - * Command prompt reader for the interactive mode (stdin). - */ - private final BufferedReader prompt; - /** - * Job Driver configuration. - */ - private Configuration driverConfiguration; - private ConfigurationModule driverConfigModule; - /** - * A reference to the running job that allows client to send messages back to the job driver - */ - private RunningJob runningJob; - - /** - * Start timestamp of the current task. - */ - private long startTime = 0; - - /** - * Total time spent performing tasks in Evaluators. - */ - private long totalTime = 0; - - /** - * Number of experiments ran so far. - */ - private int numRuns = 0; - - /** - * Set to false when job driver is done. - */ - private boolean isBusy = true; - - /** - * Retained Evaluator client. - * Parameters are injected automatically by TANG. - * - * @param command Shell command to run on each Evaluator. - * @param reef Reference to the REEF framework. - */ - @Inject - JobClient(final REEF reef, - @Parameter(Launch.Command.class) final String command, - @Parameter(Launch.NumRuns.class) final Integer numRuns) throws BindException { - - this.reef = reef; - this.command = command; - this.maxRuns = numRuns; - - // If command is not set, switch to interactive mode. (Yes, we compare pointers here) - this.isInteractive = this.command == - Launch.Command.class.getAnnotation(NamedParameter.class).default_value(); - - this.prompt = this.isInteractive ? new BufferedReader(new InputStreamReader(System.in)) : null; - - this.driverConfigModule = DriverConfiguration.CONF - .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class)) - .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis()) - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class) - .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class) - .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class) - .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class) - .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class) - .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class) - .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) - .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class); - } - - private void addCLRFiles(final File folder) throws BindException { - ConfigurationModule result = this.driverConfigModule; - for (final File f : folder.listFiles()) { - if (f.canRead() && f.exists() && f.isFile()) { - result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath()); - } - } - - this.driverConfigModule = result; - this.driverConfiguration = this.driverConfigModule.build(); - } - - /** - * Launch the job driver. - * - * @throws BindException configuration error. - */ - public void submit(File clrFolder) { - try { - addCLRFiles(clrFolder); - } catch (final BindException e) { - LOG.log(Level.FINE, "Failed to bind", e); - } - this.reef.submit(this.driverConfiguration); - } - - /** - * Send command to the job driver. Record timestamp when the command was sent. - * If this.command is set, use it; otherwise, ask user for the command. - */ - private synchronized void submitTask() { - if (this.isInteractive) { - String cmd; - try { - do { - System.out.print("\nRE> "); - cmd = this.prompt.readLine(); - } while (cmd != null && cmd.trim().isEmpty()); - } catch (final IOException ex) { - LOG.log(Level.FINE, "Error reading from stdin: {0}", ex); - cmd = null; - } - if (cmd == null || cmd.equals("exit")) { - this.runningJob.close(); - stopAndNotify(); - } else { - this.submitTask(cmd); - } - } else { - // non-interactive batch mode: - this.submitTask(this.command); - } - } - - /** - * Send command to the job driver. Record timestamp when the command was sent. - * - * @param cmd shell command to execute in all Evaluators. - */ - private synchronized void submitTask(final String cmd) { - LOG.log(Level.INFO, "Submit task {0} \"{1}\" to {2}", - new Object[]{this.numRuns + 1, cmd, this.runningJob}); - this.startTime = System.currentTimeMillis(); - this.runningJob.send(CODEC.encode(cmd)); - } - - /** - * Notify the process in waitForCompletion() method that the main process has finished. - */ - private synchronized void stopAndNotify() { - this.runningJob = null; - this.isBusy = false; - this.notify(); - } - - /** - * Wait for the job driver to complete. This method is called from Launcher.main() - */ - public void waitForCompletion() { - while (this.isBusy) { - LOG.info("Waiting for the Job Driver to complete."); - try { - synchronized (this) { - this.wait(); - } - } catch (final InterruptedException ex) { - LOG.log(Level.WARNING, "Waiting for result interrupted.", ex); - } - } - this.reef.close(); - } - - /** - * Receive notification from the job driver that the job is running. - */ - final class RunningJobHandler implements EventHandler<RunningJob> { - @Override - public void onNext(final RunningJob job) { - LOG.log(Level.INFO, "Running job: {0}", job.getId()); - synchronized (JobClient.this) { - JobClient.this.runningJob = job; - JobClient.this.submitTask(); - } - } - } - - /** - * Receive message from the job driver. - * There is only one message, which comes at the end of the driver execution - * and contains shell command output on each node. - */ - final class JobMessageHandler implements EventHandler<JobMessage> { - @Override - public void onNext(final JobMessage message) { - synchronized (JobClient.this) { - - final String result = CODEC.decode(message.get()); - final long jobTime = System.currentTimeMillis() - startTime; - totalTime += jobTime; - ++numRuns; - - LOG.log(Level.INFO, "Task {0} completed in {1} msec.:\n{2}", - new Object[]{numRuns, jobTime, result}); - - System.out.println(result); - - if (runningJob != null) { - if (isInteractive || numRuns < maxRuns) { - submitTask(); - } else { - LOG.log(Level.INFO, - "All {0} tasks complete; Average task time: {1}. Closing the job driver.", - new Object[]{maxRuns, totalTime / (double) maxRuns}); - runningJob.close(); - stopAndNotify(); - } - } - } - } - } - - /** - * Receive notification from the job driver that the job had failed. - */ - final class FailedJobHandler implements EventHandler<FailedJob> { - @Override - public void onNext(final FailedJob job) { - LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null)); - stopAndNotify(); - } - } - - /** - * Receive notification from the job driver that the job had completed successfully. - */ - final class CompletedJobHandler implements EventHandler<CompletedJob> { - @Override - public void onNext(final CompletedJob job) { - LOG.log(Level.INFO, "Completed job: {0}", job.getId()); - stopAndNotify(); - } - } - - /** - * Receive notification that there was an exception thrown from the job driver. - */ - final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { - @Override - public void onNext(final FailedRuntime error) { - LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null)); - stopAndNotify(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java deleted file mode 100644 index c4f95e1..0000000 --- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/JobDriver.java +++ /dev/null @@ -1,489 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.retained_evalCLR; - -import org.apache.reef.driver.catalog.ResourceCatalog; -import org.apache.reef.driver.client.JobMessageObserver; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextConfiguration; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.*; -import org.apache.reef.driver.task.CompletedTask; -import org.apache.reef.driver.task.TaskConfiguration; -import org.apache.reef.examples.library.Command; -import org.apache.reef.examples.library.ShellTask; -import org.apache.reef.tang.*; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy; -import org.apache.reef.tang.proto.ClassHierarchyProto; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.apache.reef.wake.time.Clock; -import org.apache.reef.wake.time.event.Alarm; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Retained Evaluator example job driver. Execute shell command on all evaluators, - * capture stdout, and return concatenated results back to the client. - */ -@Unit -public final class JobDriver { - public static final String SHELL_TASK_CLASS_HIERARCHY_FILENAME = "ShellTask.bin"; - /** - * Standard Java logger. - */ - private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); - /** - * Duration of one clock interval. - */ - private static final int CHECK_UP_INTERVAL = 1000; // 1 sec. - private static final String JVM_CONTEXT_SUFFIX = "_JVMContext"; - private static final String CLR_CONTEXT_SUFFIX = "_CLRContext"; - /** - * String codec is used to encode the results - * before passing them back to the client. - */ - private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>(); - public static int totalEvaluators = 2; - /** - * Wake clock is used to schedule periodical job check-ups. - */ - private final Clock clock; - /** - * Job observer on the client. - * We use it to send results from the driver back to the client. - */ - private final JobMessageObserver jobMessageObserver; - /** - * Job driver uses EvaluatorRequestor - * to request Evaluators that will run the Tasks. - */ - private final EvaluatorRequestor evaluatorRequestor; - /** - * Static catalog of REEF resources. - * We use it to schedule Task on every available node. - */ - private final ResourceCatalog catalog; - /** - * Shell execution results from each Evaluator. - */ - private final List<String> results = new ArrayList<>(); - /** - * Map from context ID to running evaluator context. - */ - private final Map<String, ActiveContext> contexts = new HashMap<>(); - private int nCLREvaluator = 1; // guarded by this - private int nJVMEvaluator = totalEvaluators - nCLREvaluator; // guarded by this - /** - * Job driver state. - */ - private State state = State.INIT; - /** - * First command to execute. Sometimes client can send us the first command - * before Evaluators are available; we need to store this command here. - */ - private String cmd; - /** - * Number of evaluators/tasks to complete. - */ - private int expectCount = 0; - - /** - * Job driver constructor. - * All parameters are injected from TANG automatically. - * - * @param clock Wake clock to schedule and check up running jobs. - * @param jobMessageObserver is used to send messages back to the client. - * @param evaluatorRequestor is used to request Evaluators. - */ - @Inject - JobDriver(final Clock clock, - final JobMessageObserver jobMessageObserver, - final EvaluatorRequestor evaluatorRequestor, - final ResourceCatalog catalog) { - this.clock = clock; - this.jobMessageObserver = jobMessageObserver; - this.evaluatorRequestor = evaluatorRequestor; - this.catalog = catalog; - } - - /** - * Makes a task configuration for the CLR ShellTask. - * - * @param taskId - * @return task configuration for the CLR Task. - * @throws BindException - */ - private static final Configuration getCLRTaskConfiguration( - final String taskId, final String command) throws BindException { - - final ConfigurationBuilder cb = Tang.Factory.getTang() - .newConfigurationBuilder(loadShellTaskClassHierarchy(SHELL_TASK_CLASS_HIERARCHY_FILENAME)); - - cb.bind("Org.Apache.Reef.Tasks.ITask, Org.Apache.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Org.Apache.Reef.Tasks.ShellTask, Org.Apache.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"); - cb.bind("Org.Apache.Reef.Tasks.TaskConfigurationOptions+Identifier, Org.Apache.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId); - cb.bind("Org.Apache.Reef.Tasks.ShellTask+Command, Org.Apache.Reef.Tasks.ShellTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null", command); - - return cb.build(); - } - - /** - * Makes a task configuration for the JVM ShellTask.. - * - * @param taskId - * @return task configuration for the JVM Task. - * @throws BindException - */ - private static final Configuration getJVMTaskConfiguration( - final String taskId, final String command) throws BindException { - - final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); - cb.addConfiguration( - TaskConfiguration.CONF - .set(TaskConfiguration.IDENTIFIER, taskId) - .set(TaskConfiguration.TASK, ShellTask.class) - .build() - ); - cb.bindNamedParameter(Command.class, command); - return cb.build(); - } - - /** - * Loads the class hierarchy. - * - * @return - */ - private static ClassHierarchy loadShellTaskClassHierarchy(String binFile) { - try (final InputStream chin = new FileInputStream(binFile)) { - final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin); - final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root); - return ch; - } catch (final IOException e) { - final String message = "Unable to load class hierarchy " + binFile; - LOG.log(Level.SEVERE, message, e); - throw new RuntimeException(message, e); - } - } - - private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) { - synchronized (JobDriver.this) { - - String contextIdSuffix = type.equals(EvaluatorType.JVM) ? JVM_CONTEXT_SUFFIX : CLR_CONTEXT_SUFFIX; - String contextId = eval.getId() + contextIdSuffix; - - eval.setType(type); - - LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", - new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()}); - assert (JobDriver.this.state == State.WAIT_EVALUATORS); - try { - eval.submitContext(ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, contextId).build()); - } catch (final BindException ex) { - LOG.log(Level.SEVERE, "Failed to submit context " + contextId, ex); - throw new RuntimeException(ex); - } - } - } - - /** - * Submit command to all available evaluators. - * - * @param command shell command to execute. - */ - private void submit(final String command) { - LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", - new Object[]{command, this.contexts.size(), this.state}); - assert (this.state == State.READY); - this.expectCount = this.contexts.size(); - this.state = State.WAIT_TASKS; - this.cmd = null; - for (final ActiveContext context : this.contexts.values()) { - this.submit(context, command); - } - } - - /** - * Submit a Task that execute the command to a single Evaluator. - * This method is called from <code>submitTask(cmd)</code>. - */ - private void submit(final ActiveContext context, final String command) { - try { - LOG.log(Level.INFO, "Sending command {0} to context: {1}", new Object[]{command, context}); - String taskId = context.getId() + "_task"; - final Configuration taskConfiguration; - if (context.getId().endsWith(JVM_CONTEXT_SUFFIX)) { - taskConfiguration = getJVMTaskConfiguration(taskId, command); - } else { - taskConfiguration = getCLRTaskConfiguration(taskId, command); - } - context.submitTask(taskConfiguration); - } catch (final BindException ex) { - LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); - throw new RuntimeException(ex); - } - } - - /** - * Construct the final result and forward it to the Client. - */ - private void returnResults() { - final StringBuilder sb = new StringBuilder(); - for (final String result : this.results) { - sb.append(result); - } - this.results.clear(); - LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); - this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(sb.toString())); - } - - /** - * Request evaluators on each node. - * If nodes are not available yet, schedule another request in CHECK_UP_INTERVAL. - * TODO: Ask for specific nodes. (This is not working in YARN... need to check again at some point.) - * - * @throws RuntimeException if any of the requests fails. - */ - private synchronized void requestEvaluators() { - assert (this.state == State.INIT); - final int numNodes = totalEvaluators; - if (numNodes > 0) { - LOG.log(Level.INFO, "Schedule on {0} nodes.", numNodes); - this.evaluatorRequestor.submit( - EvaluatorRequest.newBuilder() - .setMemory(128) - .setNumberOfCores(1) - .setNumber(numNodes).build() - ); - this.state = State.WAIT_EVALUATORS; - this.expectCount = numNodes; - } else { - // No nodes available yet - wait and ask again. - this.clock.scheduleAlarm(CHECK_UP_INTERVAL, new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm time) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "{0} Alarm: {1}", new Object[]{JobDriver.this.state, time}); - if (JobDriver.this.state == State.INIT) { - JobDriver.this.requestEvaluators(); - } - } - } - }); - } - } - - /** - * Possible states of the job driver. Can be one of: - * <dl> - * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd> - * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd> - * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd> - * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd> - * </dl> - */ - private enum State { - INIT, WAIT_EVALUATORS, READY, WAIT_TASKS - } - - /** - * Handles AllocatedEvaluator: Submit an empty context - */ - final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator allocatedEvaluator) { - synchronized (JobDriver.this) { - if (JobDriver.this.nJVMEvaluator > 0) { - LOG.log(Level.INFO, "===== adding JVM evaluator ====="); - JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.JVM); - JobDriver.this.nJVMEvaluator -= 1; - } else if (JobDriver.this.nCLREvaluator > 0) { - LOG.log(Level.INFO, "===== adding CLR evaluator ====="); - JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR); - JobDriver.this.nCLREvaluator -= 1; - } - } - } - } - - /** - * Receive notification that a new Context is available. - * Submit a new Distributed Shell Task to that Context. - */ - final class ActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", - new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state}); - assert (JobDriver.this.state == State.WAIT_EVALUATORS); - JobDriver.this.contexts.put(context.getId(), context); - if (--JobDriver.this.expectCount <= 0) { - JobDriver.this.state = State.READY; - if (JobDriver.this.cmd == null) { - LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", - JobDriver.this.state); - } else { - JobDriver.this.submit(JobDriver.this.cmd); - } - } - } - } - } - - /** - * Receive notification that the Context had completed. - * Remove context from the list of active context. - */ - final class ClosedContextHandler implements EventHandler<ClosedContext> { - @Override - public void onNext(final ClosedContext context) { - LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } - } - } - - /** - * Receive notification that the Context had failed. - * Remove context from the list of active context and notify the client. - */ - final class FailedContextHandler implements EventHandler<FailedContext> { - @Override - public void onNext(final FailedContext context) { - LOG.log(Level.SEVERE, "FailedContext", context); - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } - throw new RuntimeException("Failed context: ", context.asError()); - } - } - - /** - * Receive notification that the entire Evaluator had failed. - * Stop other jobs and pass this error to the job observer on the client. - */ - final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { - @Override - public void onNext(final FailedEvaluator eval) { - synchronized (JobDriver.this) { - LOG.log(Level.SEVERE, "FailedEvaluator", eval); - for (final FailedContext failedContext : eval.getFailedContextList()) { - JobDriver.this.contexts.remove(failedContext.getId()); - } - throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); - } - } - } - - /** - * Receive notification that the Task has completed successfully. - */ - final class CompletedTaskHandler implements EventHandler<CompletedTask> { - @Override - public void onNext(final CompletedTask task) { - LOG.log(Level.INFO, "Completed task: {0}", task.getId()); - // Take the message returned by the task and add it to the running result. - String result = "default result"; - try { - if (task.getId().contains(CLR_CONTEXT_SUFFIX)) { - result = new String(task.get()); - } else { - result = JVM_CODEC.decode(task.get()); - } - } catch (final Exception e) { - LOG.log(Level.WARNING, "failed to decode task outcome"); - } - synchronized (JobDriver.this) { - JobDriver.this.results.add(task.getId() + " :: " + result); - LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ - task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state}); - if (--JobDriver.this.expectCount <= 0) { - JobDriver.this.returnResults(); - JobDriver.this.state = State.READY; - if (JobDriver.this.cmd != null) { - JobDriver.this.submit(JobDriver.this.cmd); - } - } - } - } - } - - /** - * Receive notification from the client. - */ - final class ClientMessageHandler implements EventHandler<byte[]> { - @Override - public void onNext(final byte[] message) { - synchronized (JobDriver.this) { - final String command = JVM_CODEC.decode(message); - LOG.log(Level.INFO, "Client message: {0} state: {1}", - new Object[]{command, JobDriver.this.state}); - assert (JobDriver.this.cmd == null); - if (JobDriver.this.state == State.READY) { - JobDriver.this.submit(command); - } else { - // not ready yet - save the command for better times. - assert (JobDriver.this.state == State.WAIT_EVALUATORS); - JobDriver.this.cmd = command; - } - } - } - } - - /** - * Job Driver is ready and the clock is set up: request the evaluators. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); - assert (state == State.INIT); - requestEvaluators(); - } - } - - /** - * Shutting down the job driver: close the evaluators. - */ - final class StopHandler implements EventHandler<StopTime> { - @Override - public void onNext(final StopTime time) { - LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); - for (final ActiveContext context : contexts.values()) { - context.close(); - } - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java deleted file mode 100644 index 66dbe92..0000000 --- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/Launch.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.retained_evalCLR; - -import org.apache.reef.client.ClientConfiguration; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.JavaConfigurationBuilder; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.AvroConfigurationSerializer; -import org.apache.reef.tang.formats.CommandLine; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.OptionalParameter; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Retained Evaluators example - main class. - */ -public final class Launch { - - /** - * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently - */ - private static final int MAX_NUMBER_OF_EVALUATORS = JobDriver.totalEvaluators; - /** - * Standard Java logger - */ - private static final Logger LOG = Logger.getLogger(Launch.class.getName()); - - /** - * This class should not be instantiated. - */ - private Launch() { - throw new RuntimeException("Do not instantiate this class!"); - } - - /** - * Parse the command line arguments. - * - * @param args command line arguments, as passed to main() - * @return Configuration object. - * @throws BindException configuration error. - * @throws IOException error reading the configuration. - */ - private static Configuration parseCommandLine(final String[] args) - throws BindException, IOException { - final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); - final CommandLine cl = new CommandLine(confBuilder); - cl.registerShortNameOfClass(Local.class); - cl.registerShortNameOfClass(Command.class); - cl.registerShortNameOfClass(NumRuns.class); - cl.processCommandLine(args); - return confBuilder.build(); - } - - private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf) - throws InjectionException, BindException { - final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); - final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); - cb.bindNamedParameter(Command.class, injector.getNamedInstance(Command.class)); - cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class))); - return cb.build(); - } - - /** - * Parse command line arguments and create TANG configuration ready to be submitted to REEF. - * - * @param args Command line arguments, as passed into main(). - * @return (immutable) TANG Configuration object. - * @throws BindException if configuration commandLineInjector fails. - * @throws InjectionException if configuration commandLineInjector fails. - * @throws IOException error reading the configuration. - */ - private static Configuration getClientConfiguration(final String[] args) - throws BindException, InjectionException, IOException { - - final Configuration commandLineConf = parseCommandLine(args); - - final Configuration clientConfiguration = ClientConfiguration.CONF - .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class) - .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class) - .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class) - .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class) - .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class) - .build(); - - // TODO: Remove the injector, have stuff injected. - final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf); - final boolean isLocal = commandLineInjector.getNamedInstance(Local.class); - final Configuration runtimeConfiguration; - if (isLocal) { - LOG.log(Level.INFO, "Running on the local runtime"); - runtimeConfiguration = LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) - .build(); - } else { - LOG.log(Level.INFO, "Running on YARN"); - runtimeConfiguration = YarnClientConfiguration.CONF.build(); - } - - return Tang.Factory.getTang() - .newConfigurationBuilder(runtimeConfiguration, clientConfiguration, - cloneCommandLineConfiguration(commandLineConf)) - .build(); - } - - private static ConfigurationModule addAll(final ConfigurationModule conf, final OptionalParameter<String> param, final File folder) { - ConfigurationModule result = conf; - for (final File f : folder.listFiles()) { - if (f.canRead() && f.exists() && f.isFile()) { - result = result.set(param, f.getAbsolutePath()); - } - } - return result; - } - - /** - * Main method that starts the Retained Evaluators job. - * - * @param args command line parameters. - */ - public static void main(final String[] args) { - try { - final File dotNetFolder = new File(args[0]).getAbsoluteFile(); - String[] removedArgs = Arrays.copyOfRange(args, 1, args.length); - - final Configuration config = getClientConfiguration(removedArgs); - LOG.log(Level.INFO, "Configuration:\n--\n{0}--", - new AvroConfigurationSerializer().toString(config)); - final Injector injector = Tang.Factory.getTang().newInjector(config); - final JobClient client = injector.getInstance(JobClient.class); - client.submit(dotNetFolder); - client.waitForCompletion(); - LOG.info("Done!"); - } catch (final BindException | InjectionException | IOException ex) { - LOG.log(Level.SEVERE, "Job configuration error", ex); - } - } - - /** - * Command line parameter: a command to run. e.g. "echo Hello REEF" - */ - @NamedParameter(doc = "The shell command", short_name = "cmd", default_value = "*INTERACTIVE*") - public static final class Command implements Name<String> { - } - - /** - * Command line parameter: number of experiments to run. - */ - @NamedParameter(doc = "Number of times to run the command", - short_name = "num_runs", default_value = "1") - public static final class NumRuns implements Name<Integer> { - } - - /** - * Command line parameter = true to run locally, or false to run on YARN. - */ - @NamedParameter(doc = "Whether or not to run on the local runtime", - short_name = "local", default_value = "true") - public static final class Local implements Name<Boolean> { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java b/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java deleted file mode 100644 index 45455bb..0000000 --- a/lang/java/reef-examples-clr/src/main/java/org/apache/reef/examples/retained_evalCLR/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * The Retained Evaluators CLR example. - */ -package org.apache.reef.examples.retained_evalCLR; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/pom.xml b/lang/java/reef-examples/pom.xml index 68c4693..4fd9aa1 100644 --- a/lang/java/reef-examples/pom.xml +++ b/lang/java/reef-examples/pom.xml @@ -216,65 +216,6 @@ under the License. </profile> <profile> - <id>RetainedEval</id> - <build> - <defaultGoal>exec:exec</defaultGoal> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <configuration> - <executable>java</executable> - <arguments> - <argument>-classpath</argument> - <classpath/> - <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config - </argument> - <argument>-Dcom.microsoft.reef.runtime.local.folder=${project.build.directory} - </argument> - <argument>org.apache.reef.examples.retained_eval.Launch</argument> - <!-- <argument>-cmd</argument> - <argument>date</argument> - <argument>-num_runs</argument> - <argument>20</argument> - <argument>-local</argument> - <argument>true</argument> --> - </arguments> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>RetainedEval_yarn</id> - <build> - <defaultGoal>exec:exec</defaultGoal> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <configuration> - <executable>java</executable> - <arguments> - <argument>-classpath</argument> - <classpath/> - <argument>-Djava.util.logging.config.class=org.apache.reef.util.logging.Config - </argument> - <argument>org.apache.reef.examples.retained_eval.Launch</argument> - <argument>-cmd</argument> - <argument>date</argument> - <argument>-num_runs</argument> - <argument>20</argument> - <argument>-local</argument> - <argument>false</argument> - </arguments> - </configuration> - </plugin> - </plugins> - </build> - </profile> - - <profile> <id>SuspendDemo</id> <build> <defaultGoal>exec:exec</defaultGoal> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java deleted file mode 100644 index 61d549d..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.retained_eval; - -import org.apache.reef.client.*; -import org.apache.reef.examples.library.Command; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.JavaConfigurationBuilder; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; - -import javax.inject.Inject; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Retained Evaluator Shell Client. - */ -@Unit -public class JobClient { - - /** - * Standard java logger. - */ - private static final Logger LOG = Logger.getLogger(JobClient.class.getName()); - - /** - * Codec to translate messages to and from the job driver - */ - private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); - - /** - * Reference to the REEF framework. - * This variable is injected automatically in the constructor. - */ - private final REEF reef; - - /** - * Shell command to submitTask to the job driver. - */ - private final String command; - - /** - * Job Driver configuration. - */ - private final Configuration driverConfiguration; - - /** - * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode. - */ - private final boolean isInteractive; - - /** - * Total number of experiments to run. - */ - private final int maxRuns; - - /** - * Command prompt reader for the interactive mode (stdin). - */ - private final BufferedReader prompt; - - /** - * A reference to the running job that allows client to send messages back to the job driver - */ - private RunningJob runningJob; - - /** - * Start timestamp of the current task. - */ - private long startTime = 0; - - /** - * Total time spent performing tasks in Evaluators. - */ - private long totalTime = 0; - - /** - * Number of experiments ran so far. - */ - private int numRuns = 0; - - /** - * Set to false when job driver is done. - */ - private boolean isBusy = true; - - /** - * Last result returned from the job driver. - */ - private String lastResult; - - /** - * Retained Evaluator client. - * Parameters are injected automatically by TANG. - * - * @param command Shell command to run on each Evaluator. - * @param reef Reference to the REEF framework. - */ - @Inject - JobClient(final REEF reef, - @Parameter(Command.class) final String command, - @Parameter(Launch.NumRuns.class) final Integer numRuns, - @Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException { - - this.reef = reef; - this.command = command; - this.maxRuns = numRuns; - - // If command is not set, switch to interactive mode. (Yes, we compare pointers here) - this.isInteractive = this.command == - Command.class.getAnnotation(NamedParameter.class).default_value(); - - this.prompt = this.isInteractive ? - new BufferedReader(new InputStreamReader(System.in)) : null; - - final JavaConfigurationBuilder configBuilder = Tang.Factory.getTang().newConfigurationBuilder(); - configBuilder.addConfiguration( - DriverConfiguration.CONF - .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class)) - .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis()) - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class) - .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class) - .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class) - .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class) - .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class) - .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class) - .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) - .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class) - .build() - ); - configBuilder.bindNamedParameter(Launch.NumEval.class, "" + numEvaluators); - this.driverConfiguration = configBuilder.build(); - } - - /** - * @return a Configuration binding the ClientConfiguration.* event handlers to this Client. - */ - public static Configuration getClientConfiguration() { - return ClientConfiguration.CONF - .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class) - .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class) - .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class) - .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class) - .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class) - .build(); - } - - /** - * Launch the job driver. - * - * @throws BindException configuration error. - */ - public void submit() { - this.reef.submit(this.driverConfiguration); - } - - /** - * Send command to the job driver. Record timestamp when the command was sent. - * If this.command is set, use it; otherwise, ask user for the command. - */ - private synchronized void submitTask() { - if (this.isInteractive) { - String cmd; - try { - do { - System.out.print("\nRE> "); - cmd = this.prompt.readLine(); - } while (cmd != null && cmd.trim().isEmpty()); - } catch (final IOException ex) { - LOG.log(Level.FINE, "Error reading from stdin: {0}", ex); - cmd = null; - } - if (cmd == null || cmd.equals("exit")) { - this.runningJob.close(); - stopAndNotify(); - } else { - this.submitTask(cmd); - } - } else { - // non-interactive batch mode: - this.submitTask(this.command); - } - } - - /** - * Send command to the job driver. Record timestamp when the command was sent. - * - * @param cmd shell command to execute in all Evaluators. - */ - private synchronized void submitTask(final String cmd) { - LOG.log(Level.FINE, "Submit task {0} \"{1}\" to {2}", - new Object[]{this.numRuns + 1, cmd, this.runningJob}); - this.startTime = System.currentTimeMillis(); - this.runningJob.send(CODEC.encode(cmd)); - } - - /** - * Notify the process in waitForCompletion() method that the main process has finished. - */ - private synchronized void stopAndNotify() { - this.runningJob = null; - this.isBusy = false; - this.notify(); - } - - /** - * Wait for the job driver to complete. This method is called from Launch.main() - */ - public String waitForCompletion() { - while (this.isBusy) { - LOG.log(Level.FINE, "Waiting for the Job Driver to complete."); - try { - synchronized (this) { - this.wait(); - } - } catch (final InterruptedException ex) { - LOG.log(Level.WARNING, "Waiting for result interrupted.", ex); - } - } - return this.lastResult; - } - - public void close() { - this.reef.close(); - } - - /** - * Receive notification from the job driver that the job is running. - */ - final class RunningJobHandler implements EventHandler<RunningJob> { - @Override - public void onNext(final RunningJob job) { - LOG.log(Level.FINE, "Running job: {0}", job.getId()); - synchronized (JobClient.this) { - JobClient.this.runningJob = job; - JobClient.this.submitTask(); - } - } - } - - /** - * Receive message from the job driver. - * There is only one message, which comes at the end of the driver execution - * and contains shell command output on each node. - */ - final class JobMessageHandler implements EventHandler<JobMessage> { - @Override - public void onNext(final JobMessage message) { - synchronized (JobClient.this) { - - lastResult = CODEC.decode(message.get()); - final long jobTime = System.currentTimeMillis() - startTime; - totalTime += jobTime; - ++numRuns; - - LOG.log(Level.FINE, "TIME: Task {0} completed in {1} msec.:\n{2}", - new Object[]{"" + numRuns, "" + jobTime, lastResult}); - - System.out.println(lastResult); - - if (runningJob != null) { - if (isInteractive || numRuns < maxRuns) { - submitTask(); - } else { - LOG.log(Level.INFO, - "All {0} tasks complete; Average task time: {1}. Closing the job driver.", - new Object[]{maxRuns, totalTime / (double) maxRuns}); - runningJob.close(); - stopAndNotify(); - } - } - } - } - } - - /** - * Receive notification from the job driver that the job had failed. - */ - final class FailedJobHandler implements EventHandler<FailedJob> { - @Override - public void onNext(final FailedJob job) { - LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null)); - stopAndNotify(); - } - } - - /** - * Receive notification from the job driver that the job had completed successfully. - */ - final class CompletedJobHandler implements EventHandler<CompletedJob> { - @Override - public void onNext(final CompletedJob job) { - LOG.log(Level.FINE, "Completed job: {0}", job.getId()); - stopAndNotify(); - } - } - - /** - * Receive notification that there was an exception thrown from the job driver. - */ - final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { - @Override - public void onNext(final FailedRuntime error) { - LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null)); - stopAndNotify(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9f38b25/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java deleted file mode 100644 index b2b2055..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java +++ /dev/null @@ -1,370 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.retained_eval; - -import org.apache.reef.driver.client.JobMessageObserver; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextConfiguration; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.driver.task.CompletedTask; -import org.apache.reef.driver.task.TaskConfiguration; -import org.apache.reef.examples.library.Command; -import org.apache.reef.examples.library.ShellTask; -import org.apache.reef.tang.JavaConfigurationBuilder; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Retained Evaluator example job driver. Execute shell command on all evaluators, - * capture stdout, and return concatenated results back to the client. - */ -@Unit -public final class JobDriver { - /** - * Standard Java logger. - */ - private static final Logger LOG = Logger.getLogger(JobDriver.class.getName()); - - /** - * Duration of one clock interval. - */ - private static final int CHECK_UP_INTERVAL = 1000; // 1 sec. - - /** - * String codec is used to encode the results - * before passing them back to the client. - */ - private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); - /** - * Job observer on the client. - * We use it to send results from the driver back to the client. - */ - private final JobMessageObserver jobMessageObserver; - /** - * Job driver uses EvaluatorRequestor - * to request Evaluators that will run the Tasks. - */ - private final EvaluatorRequestor evaluatorRequestor; - /** - * Number of Evalutors to request (default is 1). - */ - private final int numEvaluators; - /** - * Shell execution results from each Evaluator. - */ - private final List<String> results = new ArrayList<>(); - /** - * Map from context ID to running evaluator context. - */ - private final Map<String, ActiveContext> contexts = new HashMap<>(); - /** - * Job driver state. - */ - private State state = State.INIT; - /** - * First command to execute. Sometimes client can send us the first command - * before Evaluators are available; we need to store this command here. - */ - private String cmd; - /** - * Number of evaluators/tasks to complete. - */ - private int expectCount = 0; - - /** - * Job driver constructor. - * All parameters are injected from TANG automatically. - * - * @param jobMessageObserver is used to send messages back to the client. - * @param evaluatorRequestor is used to request Evaluators. - */ - @Inject - JobDriver(final JobMessageObserver jobMessageObserver, - final EvaluatorRequestor evaluatorRequestor, - final @Parameter(Launch.NumEval.class) Integer numEvaluators) { - this.jobMessageObserver = jobMessageObserver; - this.evaluatorRequestor = evaluatorRequestor; - this.numEvaluators = numEvaluators; - } - - /** - * Construct the final result and forward it to the Client. - */ - private void returnResults() { - final StringBuilder sb = new StringBuilder(); - for (final String result : this.results) { - sb.append(result); - } - this.results.clear(); - LOG.log(Level.INFO, "Return results to the client:\n{0}", sb); - this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString())); - } - - /** - * Submit command to all available evaluators. - * - * @param command shell command to execute. - */ - private void submit(final String command) { - LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}", - new Object[]{command, this.contexts.size(), this.state}); - assert (this.state == State.READY); - this.expectCount = this.contexts.size(); - this.state = State.WAIT_TASKS; - this.cmd = null; - for (final ActiveContext context : this.contexts.values()) { - this.submit(context, command); - } - } - - /** - * Submit a Task that execute the command to a single Evaluator. - * This method is called from <code>submitTask(cmd)</code>. - */ - private void submit(final ActiveContext context, final String command) { - try { - LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context}); - final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); - cb.addConfiguration( - TaskConfiguration.CONF - .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task") - .set(TaskConfiguration.TASK, ShellTask.class) - .build() - ); - cb.bindNamedParameter(Command.class, command); - context.submitTask(cb.build()); - } catch (final BindException ex) { - LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex); - context.close(); - throw new RuntimeException(ex); - } - } - - /** - * Request the evaluators. - */ - private synchronized void requestEvaluators() { - assert (this.state == State.INIT); - LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators); - this.evaluatorRequestor.submit( - EvaluatorRequest.newBuilder() - .setMemory(128) - .setNumberOfCores(1) - .setNumber(this.numEvaluators).build() - ); - this.state = State.WAIT_EVALUATORS; - this.expectCount = this.numEvaluators; - } - - /** - * Possible states of the job driver. Can be one of: - * <dl> - * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd> - * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd> - * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd> - * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd> - * </dl> - */ - private enum State { - INIT, WAIT_EVALUATORS, READY, WAIT_TASKS - } - - /** - * Receive notification that an Evaluator had been allocated, - * and submitTask a new Task in that Evaluator. - */ - final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator eval) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}", - new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()}); - assert (JobDriver.this.state == State.WAIT_EVALUATORS); - try { - eval.submitContext(ContextConfiguration.CONF.set( - ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build()); - } catch (final BindException ex) { - LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex); - throw new RuntimeException(ex); - } - } - } - } - - /** - * Receive notification that the entire Evaluator had failed. - * Stop other jobs and pass this error to the job observer on the client. - */ - final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { - @Override - public void onNext(final FailedEvaluator eval) { - synchronized (JobDriver.this) { - LOG.log(Level.SEVERE, "FailedEvaluator", eval); - for (final FailedContext failedContext : eval.getFailedContextList()) { - JobDriver.this.contexts.remove(failedContext.getId()); - } - throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException()); - } - } - } - - /** - * Receive notification that a new Context is available. - * Submit a new Distributed Shell Task to that Context. - */ - final class ActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}", - new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state}); - assert (JobDriver.this.state == State.WAIT_EVALUATORS); - JobDriver.this.contexts.put(context.getId(), context); - if (--JobDriver.this.expectCount <= 0) { - JobDriver.this.state = State.READY; - if (JobDriver.this.cmd == null) { - LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}", - JobDriver.this.state); - } else { - JobDriver.this.submit(JobDriver.this.cmd); - } - } - } - } - } - - /** - * Receive notification that the Context had completed. - * Remove context from the list of active context. - */ - final class ClosedContextHandler implements EventHandler<ClosedContext> { - @Override - public void onNext(final ClosedContext context) { - LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } - } - } - - /** - * Receive notification that the Context had failed. - * Remove context from the list of active context and notify the client. - */ - final class FailedContextHandler implements EventHandler<FailedContext> { - @Override - public void onNext(final FailedContext context) { - LOG.log(Level.SEVERE, "FailedContext", context); - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } - throw new RuntimeException("Failed context: ", context.asError()); - } - } - - /** - * Receive notification that the Task has completed successfully. - */ - final class CompletedTaskHandler implements EventHandler<CompletedTask> { - @Override - public void onNext(final CompletedTask task) { - LOG.log(Level.INFO, "Completed task: {0}", task.getId()); - // Take the message returned by the task and add it to the running result. - final String result = CODEC.decode(task.get()); - synchronized (JobDriver.this) { - JobDriver.this.results.add(task.getId() + " :: " + result); - LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{ - task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state}); - if (--JobDriver.this.expectCount <= 0) { - JobDriver.this.returnResults(); - JobDriver.this.state = State.READY; - if (JobDriver.this.cmd != null) { - JobDriver.this.submit(JobDriver.this.cmd); - } - } - } - } - } - - /** - * Receive notification from the client. - */ - final class ClientMessageHandler implements EventHandler<byte[]> { - @Override - public void onNext(final byte[] message) { - synchronized (JobDriver.this) { - final String command = CODEC.decode(message); - LOG.log(Level.INFO, "Client message: {0} state: {1}", - new Object[]{command, JobDriver.this.state}); - assert (JobDriver.this.cmd == null); - if (JobDriver.this.state == State.READY) { - JobDriver.this.submit(command); - } else { - // not ready yet - save the command for better times. - assert (JobDriver.this.state == State.WAIT_EVALUATORS); - JobDriver.this.cmd = command; - } - } - } - } - - /** - * Job Driver is ready and the clock is set up: request the evaluators. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime}); - assert (state == State.INIT); - requestEvaluators(); - } - } - - /** - * Shutting down the job driver: close the evaluators. - */ - final class StopHandler implements EventHandler<StopTime> { - @Override - public void onNext(final StopTime time) { - LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time}); - for (final ActiveContext context : contexts.values()) { - context.close(); - } - } - } -}
