This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2c94eef2f1d2677bf64851a8c22520c44557170f Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Sep 4 14:54:59 2019 +0200 Fix call to scala Fucntion1 in coder lazy init --- runners/spark/build.gradle | 1 + .../translation/helpers/EncoderHelpers.java | 28 +++++++++++----------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index 73a710b..a948ef1 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -77,6 +77,7 @@ dependencies { provided "com.esotericsoftware.kryo:kryo:2.21" runtimeOnly library.java.jackson_module_scala runtimeOnly "org.scala-lang:scala-library:2.11.8" + compile "org.scala-lang.modules:scala-java8-compat_2.11:0.9.0" testCompile project(":sdks:java:io:kafka") testCompile project(path: ":sdks:java:core", configuration: "shadowTest") // SparkStateInternalsTest extends abstract StateInternalsTest diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f7706cc..694bc24 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -370,23 +370,23 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B private static <T> String lazyInitBeamCoder(CodegenContext ctx, Class<Coder<T>> coderClass) { String beamCoderInstance = "beamCoder"; - ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, v -> { + ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED v = (coderClass) coderClass.newInstance(); */ - List<String> parts = new ArrayList<>(); - parts.add(""); - parts.add(" = ("); - parts.add(") "); - parts.add(".newInstance();"); - StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); - List<Object> args = new ArrayList<>(); - args.add(v); - args.add(coderClass.getName()); - args.add(coderClass.getName()); - return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); - }); + List<String> parts = new ArrayList<>(); + parts.add(""); + parts.add(" = ("); + parts.add(") "); + parts.add(".newInstance();"); + StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); + List<Object> args = new ArrayList<>(); + args.add(v1); + args.add(coderClass.getName()); + args.add(coderClass.getName()); + return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); + })); return beamCoderInstance; }
