This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 65440c1  [MINOR] Avoid yarn conflict + Save Metrics after waiting to 
get full data  (#307)
65440c1 is described below

commit 65440c1a9acc5fdc4b7705de61610499ed2f6b84
Author: Lemarais <goehd4...@naver.com>
AuthorDate: Fri Apr 2 19:31:50 2021 +0900

    [MINOR] Avoid yarn conflict + Save Metrics after waiting to get full data  
(#307)
    
    **Major changes:**
    -
    
    **Minor changes to note:**
    - Change yarn command in run_beam.sh file to avoid conflict with javascript 
package manager
    - Split metric saving part from flushMetric function and call metric saving 
part after waiting metric flush
    
    **Tests for the changes:**
    -
    
    **Other comments:**
    -
    
    Closes #307
---
 bin/run_beam.sh                                    |  2 +-
 .../apache/nemo/runtime/master/RuntimeMaster.java  | 27 +++++++++++++++++++---
 .../nemo/runtime/master/metric/MetricStore.java    |  3 +++
 .../runtime/master/scheduler/ExecutorRegistry.java |  4 ++++
 4 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index ddd0c75..a0df1cd 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -21,4 +21,4 @@ VERSION=$(mvn -q \
   -Dexec.executable=echo -Dexec.args='${project.version}' \
   --non-recursive exec:exec)
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/beam/target/nemo-examples-beam-${VERSION}-shaded.jar:client/target/nemo-client-${VERSION}-shaded.jar:`yarn
 classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/beam/target/nemo-examples-beam-${VERSION}-shaded.jar:client/target/nemo-client-${VERSION}-shaded.jar:`$YARN_HOME/bin/yarn
 classpath` org.apache.nemo.client.JobLauncher "$@"
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
index e298edd..d3b48f2 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java
@@ -41,6 +41,7 @@ import org.apache.nemo.runtime.master.metric.MetricStore;
 import org.apache.nemo.runtime.master.resource.ContainerManager;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
+import org.apache.nemo.runtime.master.scheduler.ExecutorRegistry;
 import org.apache.nemo.runtime.master.scheduler.Scheduler;
 import org.apache.nemo.runtime.master.servlet.*;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -90,6 +91,7 @@ public final class RuntimeMaster {
 
   private final Scheduler scheduler;
   private final ContainerManager containerManager;
+  private final ExecutorRegistry executorRegistry;
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
   private final ClientRPC clientRPC;
@@ -130,6 +132,7 @@ public final class RuntimeMaster {
   @Inject
   private RuntimeMaster(final Scheduler scheduler,
                         final ContainerManager containerManager,
+                        final ExecutorRegistry executorRegistry,
                         final MetricMessageHandler metricMessageHandler,
                         final MessageEnvironment masterMessageEnvironment,
                         final MetricManagerMaster metricManagerMaster,
@@ -159,6 +162,7 @@ public final class RuntimeMaster {
 
     this.scheduler = scheduler;
     this.containerManager = containerManager;
+    this.executorRegistry = executorRegistry;
     this.metricMessageHandler = metricMessageHandler;
     this.masterMessageEnvironment = masterMessageEnvironment;
     this.masterMessageEnvironment
@@ -177,6 +181,7 @@ public final class RuntimeMaster {
     this.metricServer = startRestMetricServer();
     this.metricStore = MetricStore.getStore();
     this.planStateManager = planStateManager;
+    this.metricCountDownLatch = new CountDownLatch(0);
   }
 
   /**
@@ -219,11 +224,27 @@ public final class RuntimeMaster {
    * Flush metrics.
    */
   public void flushMetrics() {
-    // send metric flush request to all executors
-    metricManagerMaster.sendMetricFlushRequest();
+    if (metricCountDownLatch.getCount() == 0) {
+      metricCountDownLatch = new 
CountDownLatch(executorRegistry.getNumberOfRunningExecutors());
+      // send metric flush request to all executors
+      metricManagerMaster.sendMetricFlushRequest();
+    }
 
+    try {
+      if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
+        LOG.warn("Write Metric before all metric messages arrived.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Waiting Save Metric Process interrupted: ", e);
+      // clean up state...
+      Thread.currentThread().interrupt();
+    }
+
+    // save metric to file
     metricStore.dumpAllMetricToFile(Paths.get(dagDirectory,
       "Metric_" + jobId + "_" + System.currentTimeMillis() + 
".json").toString());
+
+    // save metric to database
     if (this.dbEnabled) {
       metricStore.saveOptimizationMetricsToDB(dbAddress, jobId, dbId, 
dbPassword);
     }
@@ -310,7 +331,6 @@ public final class RuntimeMaster {
           containerManager.requestContainer(resourceSpecification.left(), 
resourceSpecification.right());
         }
 
-        metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
       } catch (final Exception e) {
         throw new ContainerException(e);
       }
@@ -480,6 +500,7 @@ public final class RuntimeMaster {
     final ScheduledExecutorService dagLoggingExecutor = 
Executors.newSingleThreadScheduledExecutor();
     dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
       public void run() {
+        flushMetrics();
         planStateManager.storeJSON("periodic");
       }
     }, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS);
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
index 53477bc..e227ead 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -245,6 +246,8 @@ public final class MetricStore {
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) 
{
       final String jsonDump = dumpAllMetricToJson();
       writer.write(jsonDump);
+    } catch (final FileNotFoundException e) {
+      LOG.warn("Failure while writing metrics to local file: {}", e);
     } catch (final IOException e) {
       throw new MetricException(e);
     }
diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
index edf15ab..11d40c7 100644
--- 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -78,6 +78,10 @@ public final class ExecutorRegistry {
     consumer.accept(getRunningExecutors());
   }
 
+  public synchronized int getNumberOfRunningExecutors() {
+    return getRunningExecutors().size();
+  }
+
   synchronized void updateExecutor(
     final String executorId,
     final BiFunction<ExecutorRepresenter, ExecutorState, 
Pair<ExecutorRepresenter, ExecutorState>> updater) {

Reply via email to