seojangho closed pull request #73: [NEMO-62] Support Multiple DAG Submission in a Single User Program URL: https://github.com/apache/incubator-nemo/pull/73
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java index 0b80b83b3..75a015bd1 100644 --- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java +++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java @@ -25,7 +25,6 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.reef.client.DriverConfiguration; import org.apache.reef.client.DriverLauncher; -import org.apache.reef.client.LauncherStatus; import org.apache.reef.client.parameters.JobMessageHandler; import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; import org.apache.reef.io.network.naming.NameServerConfiguration; @@ -51,6 +50,7 @@ import java.util.ArrayList; import java.util.Base64; import java.util.List; +import java.util.concurrent.CountDownLatch; /** * Job launcher. @@ -62,8 +62,14 @@ private static Configuration jobAndDriverConf = null; private static Configuration deployModeConf = null; private static Configuration builtJobConf = null; + + private static DriverLauncher driverLauncher; + private static DriverRPCServer driverRPCServer; + + private static CountDownLatch driverReadyLatch; + private static CountDownLatch jobDoneLatch; private static String serializedDAG; - private static List<?> collectedData = new ArrayList<>(); + private static final List<?> COLLECTED_DATA = new ArrayList<>(); /** * private constructor. @@ -78,16 +84,14 @@ private JobLauncher() { * @throws Exception exception on the way. */ public static void main(final String[] args) throws Exception { - final DriverRPCServer driverRPCServer = new DriverRPCServer(); + driverRPCServer = new DriverRPCServer(); + // Registers actions for launching the DAG. driverRPCServer .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { }) - .registerHandler(ControlMessage.DriverToClientMessageType.ResourceReady, event -> - driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder() - .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG) - .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build()) - .build())) - .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> collectedData.addAll( + .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData())))) .run(); @@ -109,36 +113,91 @@ public static void main(final String[] args) throws Exception { // Get DeployMode Conf deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf); - // Launch client main - runUserProgramMain(builtJobConf); + // Start Driver and launch user program. + try { + if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) { + throw new RuntimeException("Configuration for launching driver is not ready"); + } + + // Launch driver + LOG.info("Launching driver"); + driverReadyLatch = new CountDownLatch(1); + driverLauncher = DriverLauncher.getLauncher(deployModeConf); + driverLauncher.submit(jobAndDriverConf, 500); + // When the driver is up and the resource is ready, the DriverReady message is delivered. - driverRPCServer.shutdown(); + // Launch client main + runUserProgramMain(builtJobConf); + + // Trigger driver shutdown afterwards + driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder() + .setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build()); + // Wait for driver to naturally finish + synchronized (driverLauncher) { + while (!driverLauncher.getStatus().isDone()) { + try { + LOG.info("Wait for the driver to finish"); + driverLauncher.wait(); + } catch (final InterruptedException e) { + LOG.warn("Interrupted: " + e); + // clean up state... + Thread.currentThread().interrupt(); + } + } + LOG.info("Driver terminated"); + } + } catch (final InjectionException e) { + throw new RuntimeException(e); + } finally { + // Close everything that's left + driverRPCServer.shutdown(); + driverLauncher.close(); + final Optional<Throwable> possibleError = driverLauncher.getStatus().getError(); + if (possibleError.isPresent()) { + throw new RuntimeException(possibleError.get()); + } else { + LOG.info("Job successfully completed"); + } + } } /** * Launch application using the application DAG. + * Notice that we launch the DAG one at a time, as the result of a DAG has to be immediately returned to the + * Java variable before the application can be resumed. * * @param dag the application DAG. */ // When modifying the signature of this method, see CompilerTestUtil#compileDAG and make corresponding changes public static void launchDAG(final DAG dag) { + // Wait until the driver is ready. try { - if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) { - throw new RuntimeException("Configuration for launching driver is not ready"); - } - serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag)); - // Launch and wait indefinitely for the job to finish - final LauncherStatus launcherStatus = DriverLauncher.getLauncher(deployModeConf) - .run(jobAndDriverConf); - final Optional<Throwable> possibleError = launcherStatus.getError(); - if (possibleError.isPresent()) { - throw new RuntimeException(possibleError.get()); - } else { - LOG.info("Job successfully completed"); - } - } catch (final InjectionException e) { - throw new RuntimeException(e); + LOG.info("Waiting for the driver to be ready"); + driverReadyLatch.await(); + } catch (final InterruptedException e) { + LOG.warn("Interrupted: " + e); + // clean up state... + Thread.currentThread().interrupt(); + } + + LOG.info("Launching DAG..."); + serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag)); + jobDoneLatch = new CountDownLatch(1); + driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder() + .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG) + .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build()) + .build()); + + // Wait for the ExecutionDone message from the driver + try { + LOG.info("Waiting for the DAG to finish execution"); + jobDoneLatch.await(); + } catch (final InterruptedException e) { + LOG.warn("Interrupted: " + e); + // clean up state... + Thread.currentThread().interrupt(); } + LOG.info("DAG execution done"); } /** @@ -160,7 +219,9 @@ private static void runUserProgramMain(final Configuration jobConf) throws Excep throw new RuntimeException("User Main Class not public"); } + LOG.info("User program started"); method.invoke(null, (Object) args); + LOG.info("User program finished"); } /** @@ -320,9 +381,12 @@ public static Configuration getBuiltJobConf() { /** * Get the collected data. * + * @param <T> the type of the data. * @return the collected data. */ public static <T> List<T> getCollectedData() { - return (List<T>) collectedData; + final List<T> result = (List<T>) new ArrayList<>(COLLECTED_DATA); + COLLECTED_DATA.clear(); // flush after fetching. + return result; } } diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java index db1927e63..d41f9ea68 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java @@ -56,7 +56,7 @@ default Object getTag() { /** * Context of the transform. */ - interface Context { + interface Context extends Serializable { /** * @return sideInputs. */ diff --git a/examples/resources/test_output_word_and_line_count b/examples/resources/test_output_word_and_line_count new file mode 100644 index 000000000..d3181c493 --- /dev/null +++ b/examples/resources/test_output_word_and_line_count @@ -0,0 +1,18 @@ +line count: 11 + +banana: 3 +bicycle: 2 +one: 1 +girl: 1 +two: 2 +three: 3 +tennis: 3 +jy: 4 +ski: 5 +piano: 3 +wonook: 5 +shakespeare: 1 +john: 2 +jangho: 2 +sanha: 1 +snowboard: 2 diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordAndLineCount.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordAndLineCount.java new file mode 100644 index 000000000..a8ac3b505 --- /dev/null +++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordAndLineCount.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.snu.nemo.examples.spark; + +import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD; +import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD; +import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Java Spark word-count and line-count examples in one. + */ +public final class JavaWordAndLineCount { + private static final Pattern SPACE = Pattern.compile(" "); + + /** + * Private constructor. + */ + private JavaWordAndLineCount() { + } + + /** + * Main method. + * @param args arguments. + * @throws Exception exceptions. + */ + public static void main(final String[] args) throws Exception { + + if (args.length < 1) { + System.err.println("Usage: JavaWordAndLineCount <input_file> [<output_file>]"); + System.exit(1); + } + + SparkSession spark = SparkSession + .builder() + .appName("JavaWordAndLineCount") + .getOrCreate(); + + JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); + + JavaPairRDD<String, Integer> lineOnes = lines.mapToPair(s -> new Tuple2<>("line count", 1)); + + JavaPairRDD<String, Integer> lineCounts = lineOnes.reduceByKey((i1, i2) -> i1 + i2); + + List<Tuple2<String, Integer>> lineOutput = lineCounts.collect(); + + JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); + + JavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s, 1)); + + JavaPairRDD<String, Integer> wordCounts = wordOnes.reduceByKey((i1, i2) -> i1 + i2); + + List<Tuple2<String, Integer>> wordOutput = wordCounts.collect(); + + final boolean writemode = args[1] != null; + if (writemode) { // print to output file + try (BufferedWriter bw = new BufferedWriter(new FileWriter(args[1]))) { + for (Tuple2<?, ?> lineTuple : lineOutput) { + bw.write(lineTuple._1 + ": " + lineTuple._2 + "\n\n"); + } + for (Tuple2<?, ?> wordTuple : wordOutput) { + bw.write(wordTuple._1 + ": " + wordTuple._2 + "\n"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { // print to console. + for (Tuple2<?, ?> lineTuple : lineOutput) { + System.out.println(lineTuple._1 + ": " + lineTuple._2 + "\n"); + } + for (Tuple2<?, ?> wordTuple : wordOutput) { + System.out.println(wordTuple._1 + ": " + wordTuple._2); + } + } + spark.stop(); + } +} diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java index c4d178f48..96ec20c56 100644 --- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java +++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java @@ -66,8 +66,30 @@ public void testSparkWordCount() throws Exception { ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName); } } + /* TODO #152: enable execution of multiple jobs (call scheduleJob multiple times with caching). + @Test(timeout = TIMEOUT) + public void testSparkWordAndLineCount() throws Exception { + final String inputFileName = "sample_input_wordcount_spark"; + final String outputFileName = "sample_output_word_and_line_count"; + final String testResourceFilename = "test_output_word_and_line_count"; + final String inputFilePath = fileBasePath + inputFileName; + final String outputFilePath = fileBasePath + outputFileName; + + JobLauncher.main(builder + .addJobId(JavaWordAndLineCount.class.getSimpleName() + "_test") + .addUserMain(JavaWordAndLineCount.class.getCanonicalName()) + .addUserArgs(inputFilePath, outputFilePath) + .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName()) + .build()); + + try { + ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename); + } finally { + ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName); + } + } - /* Temporary disabled because of Travis issue + /* Temporary disabled due to Travis issue @Test(timeout = TIMEOUT) public void testSparkMapReduce() throws Exception { final String inputFileName = "sample_input_wordcount_spark"; diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java index 426e8e048..2744d5886 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java @@ -15,13 +15,14 @@ */ package edu.snu.nemo.runtime.common.metric; +import java.io.Serializable; import java.util.List; /** * Interface for metric which contians its state. * @param <T> class of state of the metric. */ -public interface StateMetric<T> extends Metric { +public interface StateMetric<T extends Serializable> extends Metric { /** * Get its list of {@link StateTransitionEvent}. * @return list of events. diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java index 43ce124ee..abf164c5b 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java @@ -15,11 +15,13 @@ */ package edu.snu.nemo.runtime.common.metric; +import java.io.Serializable; + /** * Event of state transition. It contains timestamp and the state transition. * @param <T> class of state for the metric. */ -public final class StateTransitionEvent<T> extends Event { +public final class StateTransitionEvent<T extends Serializable> extends Event { private T prevState; private T newState; diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java index 58c3fc6f7..13d8f949d 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java @@ -79,7 +79,6 @@ private PhysicalPlanGenerator(final StagePartitioner stagePartitioner, // this is needed because of DuplicateEdgeGroupProperty. handleDuplicateEdgeGroupProperty(dagOfStages); - // Split StageGroup by Pull StageEdges splitScheduleGroupByPullStageEdges(dagOfStages); diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto index b0b174709..59a321285 100644 --- a/runtime/common/src/main/proto/ControlMessage.proto +++ b/runtime/common/src/main/proto/ControlMessage.proto @@ -21,6 +21,7 @@ option java_outer_classname = "ControlMessage"; enum ClientToDriverMessageType { LaunchDAG = 0; + DriverShutdown = 1; } message ClientToDriverMessage { @@ -38,8 +39,9 @@ message DataCollectMessage { enum DriverToClientMessageType { DriverStarted = 0; - ResourceReady = 1; + DriverReady = 1; DataCollected = 2; + ExecutionDone = 3; } message DriverToClientMessage { diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java index 45191162c..ca232e0d8 100644 --- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java +++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java @@ -73,6 +73,9 @@ private final String glusterDirectory; private final ClientRPC clientRPC; + private static ExecutorService runnerThread = Executors.newSingleThreadExecutor( + new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build()); + // Client for sending log messages private final RemoteClientMessageLoggingHandler handler; @@ -101,8 +104,9 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner, this.clientRPC = clientRPC; // TODO #69: Support job-wide execution property NodeNamesAssignmentPass.setBandwidthSpecificationString(bandwidthString); - clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG, - message -> startSchedulingUserApplication(message.getLaunchDAG().getDag())); + clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG, message -> + startSchedulingUserDAG(message.getLaunchDAG().getDag())); + clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.DriverShutdown, message -> shutdown()); // Send DriverStarted message to the client clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder() .setType(ControlMessage.DriverToClientMessageType.DriverStarted).build()); @@ -116,6 +120,15 @@ private void setUpLogger() { rootLogger.addHandler(handler); } + /** + * Trigger shutdown of the driver and the runtime master. + */ + private void shutdown() { + LOG.info("Driver shutdown initiated"); + runnerThread.execute(runtimeMaster::terminate); + runnerThread.shutdown(); + } + /** * Driver started. */ @@ -149,20 +162,21 @@ public void onNext(final ActiveContext activeContext) { if (finalExecutorLaunched) { clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder() - .setType(ControlMessage.DriverToClientMessageType.ResourceReady).build()); + .setType(ControlMessage.DriverToClientMessageType.DriverReady).build()); } } } /** - * Start user application. + * Start user DAG. */ - public void startSchedulingUserApplication(final String dagString) { - // Launch user application (with a new thread) - final ExecutorService userApplicationRunnerThread = Executors.newSingleThreadExecutor( - new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build()); - userApplicationRunnerThread.execute(() -> userApplicationRunner.run(dagString)); - userApplicationRunnerThread.shutdown(); + public void startSchedulingUserDAG(final String dagString) { + runnerThread.execute(() -> { + userApplicationRunner.run(dagString); + // send driver notification that user application is done. + clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder() + .setType(ControlMessage.DriverToClientMessageType.ExecutionDone).build()); + }); } /** diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java index 4ab57a241..aad818527 100644 --- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java +++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java @@ -120,7 +120,6 @@ public void run(final String dagString) { jobStateManager.storeJSON(dagDirectory, "final"); LOG.info("{} is complete!", physicalPlan.getId()); - runtimeMaster.terminate(); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java index 09c7c9d35..185638973 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java @@ -33,18 +33,15 @@ public final class MetricStore { private final Map<Class, Map<String, Object>> metricMap = new HashMap<>(); // You can add more metrics by adding item to this metricList list. - private final Map<String, Class> metricList = new HashMap<String, Class>() { - { - put("JobMetric", JobMetric.class); - put("StageMetric", StageMetric.class); - put("TaskMetric", TaskMetric.class); - } - }; - + private final Map<String, Class> metricList = new HashMap<>(); /** * Private constructor. */ - private MetricStore() { } + private MetricStore() { + metricList.put("JobMetric", JobMetric.class); + metricList.put("StageMetric", StageMetric.class); + metricList.put("TaskMetric", TaskMetric.class); + } /** * Getter for singleton instance. diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java index 454e91b9b..91c385cf0 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java @@ -186,7 +186,9 @@ public void terminate() { LOG.warn("Terminating master before all executor terminated messages arrived."); } } catch (final InterruptedException e) { - LOG.warn("Waiting executor terminating process interrupted."); + LOG.warn("Waiting executor terminating process interrupted: " + e); + // clean up state... + Thread.currentThread().interrupt(); } runtimeMasterThread.execute(() -> { scheduler.terminate(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services