This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 620a27a06b61fce5b3f5f15a62e05ffe3153b2ab Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Oct 23 14:11:14 2019 +0200 Remove Encoders based on kryo now that we call Beam coders in the runner --- .../translation/helpers/EncoderHelpers.java | 41 +--------------------- 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c07c9dd..704b6fe 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -51,46 +51,7 @@ import scala.reflect.ClassTag$; /** {@link Encoders} utility class. */ public class EncoderHelpers { - // 1. use actual class and not object to avoid Spark fallback to GenericRowWithSchema. - // 2. use raw class because only raw classes can be used with kryo. Cast to Class<T> to allow - // the type inference mechanism to infer for ex Encoder<WindowedValue<T>> to get back the type - // checking - - /* - --------- Encoders for internal spark runner objects - */ - - /** - * Get a bytes {@link Encoder} for {@link WindowedValue}. Bytes serialisation is issued by Kryo - */ - @SuppressWarnings("unchecked") - public static <T> Encoder<T> windowedValueEncoder() { - return Encoders.kryo((Class<T>) WindowedValue.class); - } - - /** Get a bytes {@link Encoder} for {@link KV}. Bytes serialisation is issued by Kryo */ - @SuppressWarnings("unchecked") - public static <T> Encoder<T> kvEncoder() { - return Encoders.kryo((Class<T>) KV.class); - } - - /** Get a bytes {@link Encoder} for {@code T}. Bytes serialisation is issued by Kryo */ - @SuppressWarnings("unchecked") - public static <T> Encoder<T> genericEncoder() { - return Encoders.kryo((Class<T>) Object.class); - } - - /* - */ - /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */ - /* - - public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() { - return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder()); - } - */ - - /* + /* --------- Bridges from Beam Coders to Spark Encoders */
