This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to
refs/heads/spark-runner_structured-streaming by this push:
new df41a8b Persist all output Dataset if there are multiple outputs in
pipeline Enabled Use*Metrics tests
df41a8b is described below
commit df41a8b0c85de1de3e7709d741d33890552cf981
Author: Alexey Romanenko <[email protected]>
AuthorDate: Wed Jul 10 19:09:55 2019 +0200
Persist all output Dataset if there are multiple outputs in pipeline
Enabled Use*Metrics tests
---
runners/spark/build.gradle | 5 -----
.../structuredstreaming/translation/batch/DoFnFunction.java | 2 --
.../translation/batch/ParDoTranslatorBatch.java | 13 +++++++++++--
3 files changed, 11 insertions(+), 9 deletions(-)
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 7cc0fb7..ca2e4a3 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -196,12 +196,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
// Metrics
- excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesCounterMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
// SDF
excludeCategories
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index bb69b6d..0f64b97 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -120,11 +120,9 @@ public class DoFnFunction<InputT, OutputT>
windowingStrategy,
doFnSchemaInformation);
-// DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);
-// return new ProcessContext<>(doFn, doFnRunner, outputManager,
Collections.emptyIterator())
return new ProcessContext<>(
doFn, doFnRunnerWithMetrics, outputManager,
Collections.emptyIterator())
.processPartition(iter)
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index b3316e6..e076397 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -105,7 +105,13 @@ class ParDoTranslatorBatch<InputT, OutputT>
Coder<InputT> inputCoder = ((PCollection<InputT>)
context.getInput()).getCoder();
MetricsContainerStepMapAccumulator metricsAccum =
MetricsAccumulator.getInstance();
-// MetricsContainerStepMapAccumulator metricsAccum = null;
+
+ List<TupleTag<?>> additionalOutputTags = new ArrayList<>();
+ for (TupleTag<?> tag: outputTags) {
+ if (!tag.equals(mainOutputTag)) {
+ additionalOutputTags.add(tag);
+ }
+ }
@SuppressWarnings("unchecked")
DoFnFunction<InputT, OutputT> doFnWrapper =
@@ -116,7 +122,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
windowingStrategy,
sideInputStrategies,
context.getSerializableOptions(),
- outputTags,
+ additionalOutputTags,
mainOutputTag,
inputCoder,
outputCoderMap,
@@ -125,6 +131,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
inputDataSet.mapPartitions(doFnWrapper,
EncoderHelpers.tuple2Encoder());
+ if (outputs.entrySet().size() > 1) {
+ allOutputs.persist();
+ }
for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
pruneOutputFilteredByTag(context, allOutputs, output);