This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 91e923c0dde332ea8615aa320b1c48d8c1167fc3 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Sep 5 14:02:17 2019 +0200 Cast coder instanciated by reflection --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 05595f1..0751c4c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -396,12 +397,14 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B */ List<String> parts = new ArrayList<>(); parts.add("try {"); - parts.add(" = "); - parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(" = ("); + parts.add(") "); + parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List<Object> args = new ArrayList<>(); args.add(v1); - args.add(coderClass.getName()); + args.add(coderClass.getName()); + args.add(coderClass.getName()); return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); })); return beamCoderInstance;
