This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 22d6466cae94cf482f8151a5fe6e7dde68d28d58
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Jul 18 10:58:35 2019 +0200

    Improve Pardo translation performance: avoid calling a filter transform 
when there is only one output tag
---
 .../translation/batch/ParDoTranslatorBatch.java              | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 46808b7..742c1b0 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -133,10 +133,14 @@ class ParDoTranslatorBatch<InputT, OutputT>
         inputDataSet.mapPartitions(doFnWrapper, 
EncoderHelpers.tuple2Encoder());
     if (outputs.entrySet().size() > 1) {
       allOutputs.persist();
-    }
-
-    for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-      pruneOutputFilteredByTag(context, allOutputs, output);
+      for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+        pruneOutputFilteredByTag(context, allOutputs, output);
+      }
+    } else {
+      Dataset<WindowedValue<?>> outputDataset = allOutputs.map(
+          (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, 
WindowedValue<?>>) value -> value._2,
+          EncoderHelpers.windowedValueEncoder());
+      
context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), 
outputDataset);
     }
   }
 

Reply via email to