This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 824b3445e99a0fc084b612b790c7d458689a4fd4 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Oct 23 11:52:14 2019 +0200 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 2f3bced..c07c9dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -101,7 +101,8 @@ public class EncoderHelpers { public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder) { List<Expression> serialiserList = new ArrayList<>(); - Class<T> claz = (Class<T>) Object.class; + Class<? super T> claz = beamCoder.getEncodedTypeDescriptor().getRawType(); + serialiserList.add( new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag<T> classTag = ClassTag$.MODULE$.apply(claz);
