This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8bbf991e4bded15e447f8bee7ef9e70ce9ff3ad4 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Sep 5 10:07:32 2019 +0200 Fix beam coder lazy init using reflexion --- .../translation/helpers/EncoderHelpers.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a452da0..56097b7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -388,18 +388,16 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED - v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); + v1 = coderClass.class.getDeclaredConstructor().newInstance(); */ - List<String> parts = new ArrayList<>(); + List<String> parts = new ArrayList<>(); parts.add(""); - parts.add(" = ("); - parts.add(") "); - parts.add(".getDeclaredConstructor().newInstance();"); + parts.add(" = "); + parts.add(".class.getDeclaredConstructor().newInstance();"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List<Object> args = new ArrayList<>(); args.add(v1); args.add(coderClass.getName()); - args.add(coderClass.getName()); return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); })); return beamCoderInstance;
