This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit b7283d7810f5ac0fbbd6003dbacfd65d20458563 Author: Etienne Chauchot <[email protected]> AuthorDate: Tue Dec 11 16:21:05 2018 +0100 Improve type enforcement in ReadSourceTranslator --- .../translation/batch/ReadSourceTranslatorBatch.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index a75730a..2c1aa93 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -62,11 +63,11 @@ class ReadSourceTranslatorBatch<T> // instantiates to be able to call DatasetSource.initialize() MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() { @Override public WindowedValue<T> call(Row value) throws Exception { - //TODO fix row content extraction: I guess cast is not enough - return (WindowedValue<T>) value.get(0); + //there is only one value put in each Row by the InputPartitionReader + return value.<WindowedValue<T>>getAs(0); } }; - //TODO fix encoder + //TODO fix encoder: how to get an Encoder<WindowedValue<T>> Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null); PCollection<T> output = (PCollection<T>) context.getOutput();
