This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit d7c9a4a59768687ff051ab0f28462e6376648e43 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Sep 4 16:50:17 2019 +0200 Fix getting the output value in code generation --- .../translation/helpers/EncoderHelpers.java | 37 +++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index dff308a..a452da0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -21,7 +21,6 @@ import static org.apache.spark.sql.types.DataTypes.BinaryType; import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -42,7 +41,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; @@ -144,34 +142,42 @@ public class EncoderHelpers { /* CODE GENERATED + byte[] ${ev.value}; try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); - final byte[] output; if ({input.isNull}) - output = null; - else - output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + ${ev.value} = null; + else{ + $beamCoder.encode(${input.value}, baos); + ${ev.value} = baos.toByteArray(); + } } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List<String> parts = new ArrayList<>(); - parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); - parts.add(") output = null; else output ="); + parts.add("byte[] "); + parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add(") "); + parts.add(" = null; else{"); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(", baos); "); + parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List<Object> args = new ArrayList<>(); + + args.add(ev.value()); args.add(input.isNull()); + args.add(ev.value()); args.add(beamCoder); args.add(input.value()); + args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), - new VariableValue("output", Array.class)); + return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); } @@ -263,7 +269,7 @@ public class EncoderHelpers { /* CODE GENERATED: try { - final $javaType output = + final $javaType ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); @@ -274,7 +280,8 @@ public class EncoderHelpers { List<String> parts = new ArrayList<>(); parts.add("try { final "); - parts.add(" output ="); + parts.add(" "); + parts.add(" ="); parts.add("?"); parts.add(":"); parts.add("("); @@ -286,6 +293,7 @@ public class EncoderHelpers { List<Object> args = new ArrayList<>(); args.add(javaType); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(), false)); args.add(javaType); @@ -294,8 +302,7 @@ public class EncoderHelpers { Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), - new VariableValue("output", classTag.runtimeClass())); + return ev.copy(input.code().$plus(code), input.isNull(), ev.value()); }
