This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 12949a4 [BEAM-6957] Spark portable runner: support metrics
new a91516c Merge pull request #8294: [BEAM-6957] Spark portable runner:
support metrics
12949a4 is described below
commit 12949a4a3dd2223f2bb1edec77074f60a9ce4595
Author: Kyle Weaver <[email protected]>
AuthorDate: Mon Apr 1 10:47:18 2019 -0700
[BEAM-6957] Spark portable runner: support metrics
---
.../beam/runners/spark/SparkPipelineRunner.java | 15 +++++++
.../SparkBatchPortablePipelineTranslator.java | 7 +++-
.../translation/SparkExecutableStageFunction.java | 46 ++++++++++++++--------
.../SparkExecutableStageFunctionTest.java | 14 ++++++-
4 files changed, 63 insertions(+), 19 deletions(-)
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index b4d25ab..d45f1a3 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -24,12 +24,16 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
+import org.apache.beam.runners.core.metrics.MetricsPusher;
import
org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkTranslationContext;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +60,10 @@ public class SparkPipelineRunner implements
PortablePipelineRunner {
final JavaSparkContext jsc =
SparkContextFactory.getSparkContext(pipelineOptions);
LOG.info(String.format("Running job %s on Spark master %s",
jobInfo.jobId(), jsc.master()));
AggregatorsAccumulator.init(pipelineOptions, jsc);
+
+ MetricsEnvironment.setMetricsSupported(false);
+ MetricsAccumulator.init(pipelineOptions, jsc);
+
final SparkTranslationContext context =
new SparkTranslationContext(jsc, pipelineOptions, jobInfo);
final ExecutorService executorService =
Executors.newSingleThreadExecutor();
@@ -72,6 +80,13 @@ public class SparkPipelineRunner implements
PortablePipelineRunner {
});
SparkPipelineResult result = new
SparkPipelineResult.BatchMode(submissionFuture, jsc);
+ MetricsPusher metricsPusher =
+ new MetricsPusher(
+ MetricsAccumulator.getInstance().value(),
+ pipelineOptions.as(MetricsOptions.class),
+ result);
+ metricsPusher.start();
+
result.waitUntilFinish();
executorService.shutdown();
return result;
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index 4fd65d5..2ad0669 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -39,6 +39,7 @@ import
org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -202,7 +203,11 @@ public class SparkBatchPortablePipelineTranslator {
SparkExecutableStageFunction<InputT, SideInputT> function =
new SparkExecutableStageFunction<>(
- stagePayload, context.jobInfo, outputMap,
broadcastVariablesBuilder.build());
+ stagePayload,
+ context.jobInfo,
+ outputMap,
+ broadcastVariablesBuilder.build(),
+ MetricsAccumulator.getInstance());
JavaRDD<RawUnionValue> staged = inputRdd.mapPartitions(function);
for (String outputId : outputs.values()) {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index e9ff511..01b27a7 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -32,6 +32,8 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -48,6 +50,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
@@ -74,24 +77,33 @@ public class SparkExecutableStageFunction<InputT,
SideInputT>
// map from pCollection id to tuple of serialized bytes and coder to decode
the bytes
private final Map<String, Tuple2<Broadcast<List<byte[]>>,
WindowedValueCoder<SideInputT>>>
sideInputs;
+ private final Accumulator<MetricsContainerStepMap> metricsAccumulator;
SparkExecutableStageFunction(
RunnerApi.ExecutableStagePayload stagePayload,
JobInfo jobInfo,
Map<String, Integer> outputMap,
- Map<String, Tuple2<Broadcast<List<byte[]>>,
WindowedValueCoder<SideInputT>>> sideInputs) {
- this(stagePayload, outputMap, () ->
DefaultJobBundleFactory.create(jobInfo), sideInputs);
+ Map<String, Tuple2<Broadcast<List<byte[]>>,
WindowedValueCoder<SideInputT>>> sideInputs,
+ Accumulator<MetricsContainerStepMap> metricsAccumulator) {
+ this(
+ stagePayload,
+ outputMap,
+ () -> DefaultJobBundleFactory.create(jobInfo),
+ sideInputs,
+ metricsAccumulator);
}
SparkExecutableStageFunction(
RunnerApi.ExecutableStagePayload stagePayload,
Map<String, Integer> outputMap,
JobBundleFactoryCreator jobBundleFactoryCreator,
- Map<String, Tuple2<Broadcast<List<byte[]>>,
WindowedValueCoder<SideInputT>>> sideInputs) {
+ Map<String, Tuple2<Broadcast<List<byte[]>>,
WindowedValueCoder<SideInputT>>> sideInputs,
+ Accumulator<MetricsContainerStepMap> metricsAccumulator) {
this.stagePayload = stagePayload;
this.outputMap = outputMap;
this.jobBundleFactoryCreator = jobBundleFactoryCreator;
this.sideInputs = sideInputs;
+ this.metricsAccumulator = metricsAccumulator;
}
@Override
@@ -103,7 +115,20 @@ public class SparkExecutableStageFunction<InputT,
SideInputT>
ReceiverFactory receiverFactory = new ReceiverFactory(collector,
outputMap);
StateRequestHandler stateRequestHandler =
getStateRequestHandler(executableStage,
stageBundleFactory.getProcessBundleDescriptor());
- SparkBundleProgressHandler bundleProgressHandler = new
SparkBundleProgressHandler();
+ String stageName = stagePayload.getInput();
+ MetricsContainerImpl container =
metricsAccumulator.localValue().getContainer(stageName);
+ BundleProgressHandler bundleProgressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse progress) {
+ container.update(progress.getMonitoringInfosList());
+ }
+
+ @Override
+ public void onCompleted(ProcessBundleResponse response) {
+ container.update(response.getMonitoringInfosList());
+ }
+ };
try (RemoteBundle bundle =
stageBundleFactory.getBundle(
receiverFactory, stateRequestHandler, bundleProgressHandler)) {
@@ -184,17 +209,4 @@ public class SparkExecutableStageFunction<InputT,
SideInputT>
return receivedElement -> collector.add(new RawUnionValue(tagInt,
receivedElement));
}
}
-
- private static class SparkBundleProgressHandler implements
BundleProgressHandler {
-
- @Override
- public void onProgress(ProcessBundleProgressResponse progress) {
- // TODO
- }
-
- @Override
- public void onCompleted(ProcessBundleResponse response) {
- // TODO
- }
- }
}
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index bba1ea4..56ca69a 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -34,6 +34,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
@@ -46,6 +48,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.spark.Accumulator;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -58,6 +61,9 @@ public class SparkExecutableStageFunctionTest {
@Mock private JobBundleFactory jobBundleFactory;
@Mock private StageBundleFactory stageBundleFactory;
@Mock private RemoteBundle remoteBundle;
+ @Mock private Accumulator<MetricsContainerStepMap> metricsAccumulator;
+ @Mock private MetricsContainerStepMap stepMap;
+ @Mock private MetricsContainerImpl container;
private final String inputId = "input-id";
private final ExecutableStagePayload stagePayload =
@@ -85,6 +91,8 @@ public class SparkExecutableStageFunctionTest {
ImmutableMap<String, FnDataReceiver<WindowedValue<?>>> inputReceiver =
ImmutableMap.of("input", Mockito.mock(FnDataReceiver.class));
when(remoteBundle.getInputReceivers()).thenReturn(inputReceiver);
+ when(metricsAccumulator.localValue()).thenReturn(stepMap);
+ when(stepMap.getContainer(any())).thenReturn(container);
}
@Test(expected = Exception.class)
@@ -201,6 +209,10 @@ public class SparkExecutableStageFunctionTest {
private <InputT, SideInputT> SparkExecutableStageFunction<InputT,
SideInputT> getFunction(
Map<String, Integer> outputMap) {
return new SparkExecutableStageFunction<>(
- stagePayload, outputMap, jobBundleFactoryCreator,
Collections.emptyMap());
+ stagePayload,
+ outputMap,
+ jobBundleFactoryCreator,
+ Collections.emptyMap(),
+ metricsAccumulator);
}
}