This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8bbf991e4bded15e447f8bee7ef9e70ce9ff3ad4
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Sep 5 10:07:32 2019 +0200

    Fix beam coder lazy init using reflexion
---
 .../translation/helpers/EncoderHelpers.java                    | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index a452da0..56097b7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -388,18 +388,16 @@ case class DecodeUsingSerializer[T](child: Expression, 
tag: ClassTag[T], kryo: B
     ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, 
func(v1 -> {
       /*
     CODE GENERATED
-    v = (coderClass) coderClass.getDeclaredConstructor().newInstance();
+    v1 = coderClass.class.getDeclaredConstructor().newInstance();
      */
-        List<String> parts = new ArrayList<>();
+      List<String> parts = new ArrayList<>();
         parts.add("");
-        parts.add(" = (");
-        parts.add(") ");
-        parts.add(".getDeclaredConstructor().newInstance();");
+        parts.add(" = ");
+        parts.add(".class.getDeclaredConstructor().newInstance();");
         StringContext sc = new 
StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
         List<Object> args = new ArrayList<>();
         args.add(v1);
         args.add(coderClass.getName());
-        args.add(coderClass.getName());
         return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
       }));
     return beamCoderInstance;

Reply via email to