This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new df41a8b  Persist all output Dataset if there are multiple outputs in 
pipeline Enabled Use*Metrics tests
df41a8b is described below

commit df41a8b0c85de1de3e7709d741d33890552cf981
Author: Alexey Romanenko <[email protected]>
AuthorDate: Wed Jul 10 19:09:55 2019 +0200

    Persist all output Dataset if there are multiple outputs in pipeline
    Enabled Use*Metrics tests
---
 runners/spark/build.gradle                                  |  5 -----
 .../structuredstreaming/translation/batch/DoFnFunction.java |  2 --
 .../translation/batch/ParDoTranslatorBatch.java             | 13 +++++++++++--
 3 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 7cc0fb7..ca2e4a3 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -196,12 +196,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
     excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
     // Metrics
-    excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
     excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesCounterMetrics'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
     // SDF
     excludeCategories 
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
     excludeCategories 
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index bb69b6d..0f64b97 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -120,11 +120,9 @@ public class DoFnFunction<InputT, OutputT>
             windowingStrategy,
             doFnSchemaInformation);
 
-//    DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
     DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
         new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);
 
-//        return new ProcessContext<>(doFn, doFnRunner, outputManager, 
Collections.emptyIterator())
     return new ProcessContext<>(
             doFn, doFnRunnerWithMetrics, outputManager, 
Collections.emptyIterator())
         .processPartition(iter)
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index b3316e6..e076397 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -105,7 +105,13 @@ class ParDoTranslatorBatch<InputT, OutputT>
     Coder<InputT> inputCoder = ((PCollection<InputT>) 
context.getInput()).getCoder();
 
     MetricsContainerStepMapAccumulator metricsAccum = 
MetricsAccumulator.getInstance();
-//    MetricsContainerStepMapAccumulator metricsAccum = null;
+
+    List<TupleTag<?>> additionalOutputTags = new ArrayList<>();
+    for (TupleTag<?> tag: outputTags) {
+      if (!tag.equals(mainOutputTag)) {
+        additionalOutputTags.add(tag);
+      }
+    }
 
     @SuppressWarnings("unchecked")
     DoFnFunction<InputT, OutputT> doFnWrapper =
@@ -116,7 +122,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
             windowingStrategy,
             sideInputStrategies,
             context.getSerializableOptions(),
-            outputTags,
+            additionalOutputTags,
             mainOutputTag,
             inputCoder,
             outputCoderMap,
@@ -125,6 +131,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
 
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
         inputDataSet.mapPartitions(doFnWrapper, 
EncoderHelpers.tuple2Encoder());
+    if (outputs.entrySet().size() > 1) {
+      allOutputs.persist();
+    }
 
     for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
       pruneOutputFilteredByTag(context, allOutputs, output);

Reply via email to