Repository: reef
Updated Branches:
  refs/heads/master f236c7a81 -> 58e7c67b4


[REEF-1743] Implement the Distributed Shell example in Java

JIRA:
  [REEF-1743](https://issues.apache.org/jira/browse/REEF-1743)

Pull Request:
  This closes #1259


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/58e7c67b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/58e7c67b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/58e7c67b

Branch: refs/heads/master
Commit: 58e7c67b4ec6b3f8d235f7eed60feeffba9503e3
Parents: f236c7a
Author: Sergiy Matusevych <[email protected]>
Authored: Fri May 6 13:00:53 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Feb 22 18:01:28 2017 -0800

----------------------------------------------------------------------
 .../distributedshell/NumEvaluators.java         |  29 +++++
 .../examples/distributedshell/RuntimeName.java  |  29 +++++
 .../examples/distributedshell/ShellClient.java  | 105 +++++++++++++++++++
 .../examples/distributedshell/ShellDriver.java  |  93 ++++++++++++++++
 .../examples/distributedshell/package-info.java |  22 ++++
 .../apache/reef/examples/library/Command.java   |   4 +-
 .../apache/reef/examples/library/ShellTask.java |   3 +-
 7 files changed, 282 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/NumEvaluators.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/NumEvaluators.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/NumEvaluators.java
new file mode 100644
index 0000000..6f14a6f
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/NumEvaluators.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.distributedshell;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/** Command line parameter: Number of evaluators to request. */
+@NamedParameter(doc = "Number of evaluators", short_name = "n", default_value 
= "1")
+final class NumEvaluators implements Name<Integer> {
+  /** Do not instantiate this class. */
+  private NumEvaluators() { }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/RuntimeName.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/RuntimeName.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/RuntimeName.java
new file mode 100644
index 0000000..1d915ac
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/RuntimeName.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.distributedshell;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/** Command line parameter: REEF runtime to use. Can be local, yarn, mesos, or 
hdi. */
+@NamedParameter(doc = "Runtime", short_name = "r", default_value = "local")
+final class RuntimeName implements Name<String> {
+  /** Do not instantiate this class. */
+  private RuntimeName() { }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellClient.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellClient.java
new file mode 100644
index 0000000..1596322
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.distributedshell;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/** The Client for REEF distributed shell example. */
+public final class ShellClient {
+
+  private static final Logger LOG = 
Logger.getLogger(ShellClient.class.getName());
+
+  /** Number of milliseconds to wait for the job to complete. */
+  private static final int JOB_TIMEOUT = 60000; // 1 min.
+
+  private static final Tang TANG = Tang.Factory.getTang();
+
+  private static final Configuration STATIC_DRIVER_CONFIG = 
DriverConfiguration.CONF
+      .set(DriverConfiguration.DRIVER_IDENTIFIER, "DistributedShell")
+      .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(ShellDriver.class))
+      .set(DriverConfiguration.ON_DRIVER_STARTED, 
ShellDriver.StartHandler.class)
+      .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
ShellDriver.EvaluatorAllocatedHandler.class)
+      .build();
+
+  /**
+   * Start the distributed shell job.
+   * @param args command line parameters.
+   * @throws InjectionException configuration error.
+   */
+  public static void main(final String[] args) throws InjectionException, 
IOException {
+
+    final JavaConfigurationBuilder driverConfigBuilder = 
TANG.newConfigurationBuilder(STATIC_DRIVER_CONFIG);
+
+    new CommandLine(driverConfigBuilder)
+        .registerShortNameOfClass(Command.class)
+        .registerShortNameOfClass(NumEvaluators.class)
+        .registerShortNameOfClass(RuntimeName.class)
+        .processCommandLine(args);
+
+    final Configuration driverConfig = driverConfigBuilder.build();
+
+    final Injector injector = TANG.newInjector(driverConfig);
+
+    final int numEvaluators = injector.getNamedInstance(NumEvaluators.class);
+    final String runtimeName = injector.getNamedInstance(RuntimeName.class);
+    final String command = injector.getNamedInstance(Command.class);
+
+    LOG.log(Level.INFO, "REEF distributed shell: {0} evaluators on {1} runtime 
:: {2}",
+        new Object[] {numEvaluators, runtimeName, command});
+
+    final Configuration runtimeConfig;
+
+    switch (runtimeName) {
+    case "local":
+      runtimeConfig = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
numEvaluators)
+          .build();
+      break;
+    case "yarn":
+      runtimeConfig = YarnClientConfiguration.CONF.build();
+      break;
+    default:
+      LOG.log(Level.SEVERE, "Unknown runtime: {0}", runtimeName);
+      throw new IllegalArgumentException("Unknown runtime: " + runtimeName);
+    }
+
+    final LauncherStatus status = 
DriverLauncher.getLauncher(runtimeConfig).run(driverConfig, JOB_TIMEOUT);
+
+    LOG.log(Level.INFO, "REEF job completed: {0}", status);
+  }
+
+  /** This class should never be instantiated. */
+  private ShellClient() { }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellDriver.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellDriver.java
new file mode 100644
index 0000000..db33b7b
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/ShellDriver.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.distributedshell;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/** The Driver code for REEF distributed shell application. */
+@Unit
+final class ShellDriver {
+
+  private static final Logger LOG = 
Logger.getLogger(ShellDriver.class.getName());
+
+  private static final Configuration STATIC_TASK_CONFIG = 
TaskConfiguration.CONF
+      .set(TaskConfiguration.IDENTIFIER, "ShellTask")
+      .set(TaskConfiguration.TASK, ShellTask.class)
+      .build();
+
+  private final EvaluatorRequestor requestor;
+  private final int numEvaluators;
+  private final String command;
+
+  @Inject
+  private ShellDriver(
+      final EvaluatorRequestor requestor,
+      @Parameter(NumEvaluators.class) final int numEvaluators,
+      @Parameter(Command.class) final String command) {
+
+    this.requestor = requestor;
+    this.numEvaluators = numEvaluators;
+    this.command = command;
+  }
+
+  /** Driver start event: Request the evaluators. */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(numEvaluators)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+    }
+  }
+
+  /** AllocatedEvaluator event: Submit the distributed shell task. */
+  final class EvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+
+      LOG.log(Level.INFO,
+          "Submitting command {0} task to evaluator: {1}", new Object[] 
{command, allocatedEvaluator});
+
+      final JavaConfigurationBuilder taskConfigBuilder =
+          Tang.Factory.getTang().newConfigurationBuilder(STATIC_TASK_CONFIG);
+
+      taskConfigBuilder.bindNamedParameter(Command.class, command);
+
+      allocatedEvaluator.submitTask(taskConfigBuilder.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/package-info.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/package-info.java
new file mode 100644
index 0000000..828a93d
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/distributedshell/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Simple Distributed Shell example.
+ */
+package org.apache.reef.examples.distributedshell;

http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
index f7e8b1c..b3a99f1 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/Command.java
@@ -22,8 +22,8 @@ import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 
 /**
- * Command line parameter: a command to run. e.g. "echo Hello REEF"
+ * Command line parameter: a command to run, e.g. "echo Hello REEF"
  */
-@NamedParameter(doc = "The shell command", short_name = "cmd", default_value = 
"*INTERACTIVE*")
+@NamedParameter(doc = "The shell command", short_name = "cmd")
 public final class Command implements Name<String> {
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/58e7c67b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
index 77fa5ca..01eae81 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/library/ShellTask.java
@@ -66,8 +66,9 @@ public final class ShellTask implements Task {
    */
   @Override
   public byte[] call(final byte[] memento) {
+    LOG.log(Level.INFO, "RUN: command: {0}", this.command);
     final String result = CommandUtils.runCommand(this.command);
-    LOG.log(Level.INFO, result);
+    LOG.log(Level.INFO, "RUN: result: {0}", result);
     return CODEC.encode(result);
   }
 }

Reply via email to