This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 91e923c0dde332ea8615aa320b1c48d8c1167fc3
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Sep 5 14:02:17 2019 +0200

    Cast coder instanciated by reflection
---
 .../structuredstreaming/translation/helpers/EncoderHelpers.java  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 05595f1..0751c4c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.spark.sql.Encoder;
@@ -396,12 +397,14 @@ case class DecodeUsingSerializer[T](child: Expression, 
tag: ClassTag[T], kryo: B
      */
       List<String> parts = new ArrayList<>();
         parts.add("try {");
-        parts.add(" = ");
-        parts.add(".class.getDeclaredConstructor().newInstance();} catch 
(Exception e) {throw new 
RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
+        parts.add(" = (");
+      parts.add(") ");
+      parts.add(".class.getDeclaredConstructor().newInstance();} catch 
(Exception e) {throw new 
RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
         StringContext sc = new 
StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
         List<Object> args = new ArrayList<>();
         args.add(v1);
-        args.add(coderClass.getName());
+      args.add(coderClass.getName());
+      args.add(coderClass.getName());
         return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq());
       }));
     return beamCoderInstance;

Reply via email to