This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6a278395d77b3578da10a9621c85883a2d6f2ded Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Oct 23 11:45:39 2019 +0200 Use beam encoders also in the output of the source translation --- .../translation/batch/ReadSourceTranslatorBatch.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index ceb87cf..6af7f55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch<T> Dataset<WindowedValue<T>> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), - // using kryo bytes serialization because the mapper already calls - // windowedValueCoder.decode, no need to call it also in the Spark encoder - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9f1e34d..ea10272 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming<T> Dataset<WindowedValue<T>> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), - // using kryo bytes serialization because the mapper already calls - // windowedValueCoder.decode, no need to call it also in the Spark encoder - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset);
