This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 12949a4  [BEAM-6957] Spark portable runner: support metrics
     new a91516c  Merge pull request #8294: [BEAM-6957] Spark portable runner: 
support metrics
12949a4 is described below

commit 12949a4a3dd2223f2bb1edec77074f60a9ce4595
Author: Kyle Weaver <[email protected]>
AuthorDate: Mon Apr 1 10:47:18 2019 -0700

    [BEAM-6957] Spark portable runner: support metrics
---
 .../beam/runners/spark/SparkPipelineRunner.java    | 15 +++++++
 .../SparkBatchPortablePipelineTranslator.java      |  7 +++-
 .../translation/SparkExecutableStageFunction.java  | 46 ++++++++++++++--------
 .../SparkExecutableStageFunctionTest.java          | 14 ++++++-
 4 files changed, 63 insertions(+), 19 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index b4d25ab..d45f1a3 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -24,12 +24,16 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
+import org.apache.beam.runners.core.metrics.MetricsPusher;
 import 
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import 
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkTranslationContext;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +60,10 @@ public class SparkPipelineRunner implements 
PortablePipelineRunner {
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
     LOG.info(String.format("Running job %s on Spark master %s", 
jobInfo.jobId(), jsc.master()));
     AggregatorsAccumulator.init(pipelineOptions, jsc);
+
+    MetricsEnvironment.setMetricsSupported(false);
+    MetricsAccumulator.init(pipelineOptions, jsc);
+
     final SparkTranslationContext context =
         new SparkTranslationContext(jsc, pipelineOptions, jobInfo);
     final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
@@ -72,6 +80,13 @@ public class SparkPipelineRunner implements 
PortablePipelineRunner {
             });
 
     SparkPipelineResult result = new 
SparkPipelineResult.BatchMode(submissionFuture, jsc);
+    MetricsPusher metricsPusher =
+        new MetricsPusher(
+            MetricsAccumulator.getInstance().value(),
+            pipelineOptions.as(MetricsOptions.class),
+            result);
+    metricsPusher.start();
+
     result.waitUntilFinish();
     executorService.shutdown();
     return result;
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index 4fd65d5..2ad0669 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -39,6 +39,7 @@ import 
org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -202,7 +203,11 @@ public class SparkBatchPortablePipelineTranslator {
 
     SparkExecutableStageFunction<InputT, SideInputT> function =
         new SparkExecutableStageFunction<>(
-            stagePayload, context.jobInfo, outputMap, 
broadcastVariablesBuilder.build());
+            stagePayload,
+            context.jobInfo,
+            outputMap,
+            broadcastVariablesBuilder.build(),
+            MetricsAccumulator.getInstance());
     JavaRDD<RawUnionValue> staged = inputRdd.mapPartitions(function);
 
     for (String outputId : outputs.values()) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index e9ff511..01b27a7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -32,6 +32,8 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -48,6 +50,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.slf4j.Logger;
@@ -74,24 +77,33 @@ public class SparkExecutableStageFunction<InputT, 
SideInputT>
   // map from pCollection id to tuple of serialized bytes and coder to decode 
the bytes
   private final Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>>
       sideInputs;
+  private final Accumulator<MetricsContainerStepMap> metricsAccumulator;
 
   SparkExecutableStageFunction(
       RunnerApi.ExecutableStagePayload stagePayload,
       JobInfo jobInfo,
       Map<String, Integer> outputMap,
-      Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>> sideInputs) {
-    this(stagePayload, outputMap, () -> 
DefaultJobBundleFactory.create(jobInfo), sideInputs);
+      Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>> sideInputs,
+      Accumulator<MetricsContainerStepMap> metricsAccumulator) {
+    this(
+        stagePayload,
+        outputMap,
+        () -> DefaultJobBundleFactory.create(jobInfo),
+        sideInputs,
+        metricsAccumulator);
   }
 
   SparkExecutableStageFunction(
       RunnerApi.ExecutableStagePayload stagePayload,
       Map<String, Integer> outputMap,
       JobBundleFactoryCreator jobBundleFactoryCreator,
-      Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>> sideInputs) {
+      Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>> sideInputs,
+      Accumulator<MetricsContainerStepMap> metricsAccumulator) {
     this.stagePayload = stagePayload;
     this.outputMap = outputMap;
     this.jobBundleFactoryCreator = jobBundleFactoryCreator;
     this.sideInputs = sideInputs;
+    this.metricsAccumulator = metricsAccumulator;
   }
 
   @Override
@@ -103,7 +115,20 @@ public class SparkExecutableStageFunction<InputT, 
SideInputT>
       ReceiverFactory receiverFactory = new ReceiverFactory(collector, 
outputMap);
       StateRequestHandler stateRequestHandler =
           getStateRequestHandler(executableStage, 
stageBundleFactory.getProcessBundleDescriptor());
-      SparkBundleProgressHandler bundleProgressHandler = new 
SparkBundleProgressHandler();
+      String stageName = stagePayload.getInput();
+      MetricsContainerImpl container = 
metricsAccumulator.localValue().getContainer(stageName);
+      BundleProgressHandler bundleProgressHandler =
+          new BundleProgressHandler() {
+            @Override
+            public void onProgress(ProcessBundleProgressResponse progress) {
+              container.update(progress.getMonitoringInfosList());
+            }
+
+            @Override
+            public void onCompleted(ProcessBundleResponse response) {
+              container.update(response.getMonitoringInfosList());
+            }
+          };
       try (RemoteBundle bundle =
           stageBundleFactory.getBundle(
               receiverFactory, stateRequestHandler, bundleProgressHandler)) {
@@ -184,17 +209,4 @@ public class SparkExecutableStageFunction<InputT, 
SideInputT>
       return receivedElement -> collector.add(new RawUnionValue(tagInt, 
receivedElement));
     }
   }
-
-  private static class SparkBundleProgressHandler implements 
BundleProgressHandler {
-
-    @Override
-    public void onProgress(ProcessBundleProgressResponse progress) {
-      // TODO
-    }
-
-    @Override
-    public void onCompleted(ProcessBundleResponse response) {
-      // TODO
-    }
-  }
 }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index bba1ea4..56ca69a 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -34,6 +34,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
@@ -46,6 +48,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.spark.Accumulator;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -58,6 +61,9 @@ public class SparkExecutableStageFunctionTest {
   @Mock private JobBundleFactory jobBundleFactory;
   @Mock private StageBundleFactory stageBundleFactory;
   @Mock private RemoteBundle remoteBundle;
+  @Mock private Accumulator<MetricsContainerStepMap> metricsAccumulator;
+  @Mock private MetricsContainerStepMap stepMap;
+  @Mock private MetricsContainerImpl container;
 
   private final String inputId = "input-id";
   private final ExecutableStagePayload stagePayload =
@@ -85,6 +91,8 @@ public class SparkExecutableStageFunctionTest {
     ImmutableMap<String, FnDataReceiver<WindowedValue<?>>> inputReceiver =
         ImmutableMap.of("input", Mockito.mock(FnDataReceiver.class));
     when(remoteBundle.getInputReceivers()).thenReturn(inputReceiver);
+    when(metricsAccumulator.localValue()).thenReturn(stepMap);
+    when(stepMap.getContainer(any())).thenReturn(container);
   }
 
   @Test(expected = Exception.class)
@@ -201,6 +209,10 @@ public class SparkExecutableStageFunctionTest {
   private <InputT, SideInputT> SparkExecutableStageFunction<InputT, 
SideInputT> getFunction(
       Map<String, Integer> outputMap) {
     return new SparkExecutableStageFunction<>(
-        stagePayload, outputMap, jobBundleFactoryCreator, 
Collections.emptyMap());
+        stagePayload,
+        outputMap,
+        jobBundleFactoryCreator,
+        Collections.emptyMap(),
+        metricsAccumulator);
   }
 }

Reply via email to