This is an automated email from the ASF dual-hosted git repository. wonook pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push: new 65440c1 [MINOR] Avoid yarn conflict + Save Metrics after waiting to get full data (#307) 65440c1 is described below commit 65440c1a9acc5fdc4b7705de61610499ed2f6b84 Author: Lemarais <goehd4...@naver.com> AuthorDate: Fri Apr 2 19:31:50 2021 +0900 [MINOR] Avoid yarn conflict + Save Metrics after waiting to get full data (#307) **Major changes:** - **Minor changes to note:** - Change yarn command in run_beam.sh file to avoid conflict with javascript package manager - Split metric saving part from flushMetric function and call metric saving part after waiting metric flush **Tests for the changes:** - **Other comments:** - Closes #307 --- bin/run_beam.sh | 2 +- .../apache/nemo/runtime/master/RuntimeMaster.java | 27 +++++++++++++++++++--- .../nemo/runtime/master/metric/MetricStore.java | 3 +++ .../runtime/master/scheduler/ExecutorRegistry.java | 4 ++++ 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/bin/run_beam.sh b/bin/run_beam.sh index ddd0c75..a0df1cd 100755 --- a/bin/run_beam.sh +++ b/bin/run_beam.sh @@ -21,4 +21,4 @@ VERSION=$(mvn -q \ -Dexec.executable=echo -Dexec.args='${project.version}' \ --non-recursive exec:exec) -java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-${VERSION}-shaded.jar:client/target/nemo-client-${VERSION}-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" +java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-${VERSION}-shaded.jar:client/target/nemo-client-${VERSION}-shaded.jar:`$YARN_HOME/bin/yarn classpath` org.apache.nemo.client.JobLauncher "$@" diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java index e298edd..d3b48f2 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java @@ -41,6 +41,7 @@ import org.apache.nemo.runtime.master.metric.MetricStore; import org.apache.nemo.runtime.master.resource.ContainerManager; import org.apache.nemo.runtime.master.resource.ExecutorRepresenter; import org.apache.nemo.runtime.master.scheduler.BatchScheduler; +import org.apache.nemo.runtime.master.scheduler.ExecutorRegistry; import org.apache.nemo.runtime.master.scheduler.Scheduler; import org.apache.nemo.runtime.master.servlet.*; import org.apache.reef.annotations.audience.DriverSide; @@ -90,6 +91,7 @@ public final class RuntimeMaster { private final Scheduler scheduler; private final ContainerManager containerManager; + private final ExecutorRegistry executorRegistry; private final MetricMessageHandler metricMessageHandler; private final MessageEnvironment masterMessageEnvironment; private final ClientRPC clientRPC; @@ -130,6 +132,7 @@ public final class RuntimeMaster { @Inject private RuntimeMaster(final Scheduler scheduler, final ContainerManager containerManager, + final ExecutorRegistry executorRegistry, final MetricMessageHandler metricMessageHandler, final MessageEnvironment masterMessageEnvironment, final MetricManagerMaster metricManagerMaster, @@ -159,6 +162,7 @@ public final class RuntimeMaster { this.scheduler = scheduler; this.containerManager = containerManager; + this.executorRegistry = executorRegistry; this.metricMessageHandler = metricMessageHandler; this.masterMessageEnvironment = masterMessageEnvironment; this.masterMessageEnvironment @@ -177,6 +181,7 @@ public final class RuntimeMaster { this.metricServer = startRestMetricServer(); this.metricStore = MetricStore.getStore(); this.planStateManager = planStateManager; + this.metricCountDownLatch = new CountDownLatch(0); } /** @@ -219,11 +224,27 @@ public final class RuntimeMaster { * Flush metrics. */ public void flushMetrics() { - // send metric flush request to all executors - metricManagerMaster.sendMetricFlushRequest(); + if (metricCountDownLatch.getCount() == 0) { + metricCountDownLatch = new CountDownLatch(executorRegistry.getNumberOfRunningExecutors()); + // send metric flush request to all executors + metricManagerMaster.sendMetricFlushRequest(); + } + try { + if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, TimeUnit.MILLISECONDS)) { + LOG.warn("Write Metric before all metric messages arrived."); + } + } catch (InterruptedException e) { + LOG.warn("Waiting Save Metric Process interrupted: ", e); + // clean up state... + Thread.currentThread().interrupt(); + } + + // save metric to file metricStore.dumpAllMetricToFile(Paths.get(dagDirectory, "Metric_" + jobId + "_" + System.currentTimeMillis() + ".json").toString()); + + // save metric to database if (this.dbEnabled) { metricStore.saveOptimizationMetricsToDB(dbAddress, jobId, dbId, dbPassword); } @@ -310,7 +331,6 @@ public final class RuntimeMaster { containerManager.requestContainer(resourceSpecification.left(), resourceSpecification.right()); } - metricCountDownLatch = new CountDownLatch(resourceRequestCount.get()); } catch (final Exception e) { throw new ContainerException(e); } @@ -480,6 +500,7 @@ public final class RuntimeMaster { final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor(); dagLoggingExecutor.scheduleAtFixedRate(new Runnable() { public void run() { + flushMetrics(); planStateManager.storeJSON("periodic"); } }, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS); diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java index 53477bc..e227ead 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -245,6 +246,8 @@ public final class MetricStore { try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) { final String jsonDump = dumpAllMetricToJson(); writer.write(jsonDump); + } catch (final FileNotFoundException e) { + LOG.warn("Failure while writing metrics to local file: {}", e); } catch (final IOException e) { throw new MetricException(e); } diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java index edf15ab..11d40c7 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java @@ -78,6 +78,10 @@ public final class ExecutorRegistry { consumer.accept(getRunningExecutors()); } + public synchronized int getNumberOfRunningExecutors() { + return getRunningExecutors().size(); + } + synchronized void updateExecutor( final String executorId, final BiFunction<ExecutorRepresenter, ExecutorState, Pair<ExecutorRepresenter, ExecutorState>> updater) {