This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 22d6466cae94cf482f8151a5fe6e7dde68d28d58 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Jul 18 10:58:35 2019 +0200 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag --- .../translation/batch/ParDoTranslatorBatch.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 46808b7..742c1b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -133,10 +133,14 @@ class ParDoTranslatorBatch<InputT, OutputT> inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); if (outputs.entrySet().size() > 1) { allOutputs.persist(); - } - - for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { - pruneOutputFilteredByTag(context, allOutputs, output); + for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { + pruneOutputFilteredByTag(context, allOutputs, output); + } + } else { + Dataset<WindowedValue<?>> outputDataset = allOutputs.map( + (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) value -> value._2, + EncoderHelpers.windowedValueEncoder()); + context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
