This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit c6f2ac9b21f7cfb9e1e81675cdf7f511b794559d Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Sep 5 15:35:34 2019 +0200 Apply spotless and checkstyle and add javadocs --- .../translation/helpers/EncoderHelpers.java | 137 +++++++++++++-------- .../structuredstreaming/utils/EncodersTest.java | 32 +++-- 2 files changed, 113 insertions(+), 56 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c9ab435..f990121 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -89,21 +89,31 @@ public class EncoderHelpers { --------- Bridges from Beam Coders to Spark Encoders */ - /** A way to construct encoders using generic serializers. */ - public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder/*, Class<T> claz*/){ + /** + * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code + * generation). + */ + public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder) { List<Expression> serialiserList = new ArrayList<>(); Class<T> claz = (Class<T>) Object.class; - serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); + serialiserList.add( + new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag<T> classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), - new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), + new DecodeUsingBeamCoder<>( + new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); } + /** + * Catalyst Expression that serializes elements using Beam {@link Coder}. + * + * @param <T>: Type of elements ot be serialized. + */ public static class EncodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression { private Expression child; @@ -114,13 +124,16 @@ public class EncoderHelpers { this.beamCoder = beamCoder; } - @Override public Expression child() { + @Override + public Expression child() { return child; } - @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { + @Override + public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); + String accessCode = + ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -140,14 +153,17 @@ public class EncoderHelpers { */ List<String> parts = new ArrayList<>(); parts.add("byte[] "); - parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add( + ";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); parts.add(") "); parts.add(" = null; else{"); parts.add(".encode("); parts.add(", baos); "); - parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add( + " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); - StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); + StringContext sc = + new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List<Object> args = new ArrayList<>(); @@ -157,18 +173,19 @@ public class EncoderHelpers { args.add(accessCode); args.add(input.value()); args.add(ev.value()); - Block code = (new Block.BlockHelper(sc)) - .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + Block code = + (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); + return ev.copy(input.code().$plus(code), input.isNull(), ev.value()); } - - @Override public DataType dataType() { + @Override + public DataType dataType() { return BinaryType; } - @Override public Object productElement(int n) { + @Override + public Object productElement(int n) { switch (n) { case 0: return child; @@ -179,15 +196,18 @@ public class EncoderHelpers { } } - @Override public int productArity() { + @Override + public int productArity() { return 2; } - @Override public boolean canEqual(Object that) { + @Override + public boolean canEqual(Object that) { return (that instanceof EncodeUsingBeamCoder); } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) { return true; } @@ -198,12 +218,18 @@ public class EncoderHelpers { return beamCoder.equals(that.beamCoder) && child.equals(that.child); } - @Override public int hashCode() { + @Override + public int hashCode() { return Objects.hash(super.hashCode(), child, beamCoder); } } - public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression{ + /** + * Catalyst Expression that deserializes elements using Beam {@link Coder}. + * + * @param <T>: Type of elements ot be serialized. + */ + public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression { private Expression child; private ClassTag<T> classTag; @@ -215,28 +241,31 @@ public class EncoderHelpers { this.beamCoder = beamCoder; } - @Override public Expression child() { + @Override + public Expression child() { return child; } - @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { + @Override + public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to deserialize. - String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); + String accessCode = + ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); String javaType = CodeGenerator.javaType(dataType()); -/* - CODE GENERATED: - final $javaType ${ev.value} - try { - ${ev.value} = - ${input.isNull} ? - ${CodeGenerator.defaultValue(dataType)} : - ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (Exception e) { - throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); - } -*/ + /* + CODE GENERATED: + final $javaType ${ev.value} + try { + ${ev.value} = + ${input.isNull} ? + ${CodeGenerator.defaultValue(dataType)} : + ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); + } catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } + */ List<String> parts = new ArrayList<>(); parts.add("final "); @@ -247,9 +276,11 @@ public class EncoderHelpers { parts.add(": ("); parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); - parts.add(")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add( + ")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); - StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); + StringContext sc = + new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List<Object> args = new ArrayList<>(); args.add(javaType); @@ -260,14 +291,14 @@ public class EncoderHelpers { args.add(javaType); args.add(accessCode); args.add(input.value()); - Block code = (new Block.BlockHelper(sc)) - .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + Block code = + (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq()); return ev.copy(input.code().$plus(code), input.isNull(), ev.value()); - } - @Override public Object nullSafeEval(Object input) { + @Override + public Object nullSafeEval(Object input) { try { return beamCoder.decode(new ByteArrayInputStream((byte[]) input)); } catch (Exception e) { @@ -275,11 +306,13 @@ public class EncoderHelpers { } } - @Override public DataType dataType() { + @Override + public DataType dataType() { return new ObjectType(classTag.runtimeClass()); } - @Override public Object productElement(int n) { + @Override + public Object productElement(int n) { switch (n) { case 0: return child; @@ -292,15 +325,18 @@ public class EncoderHelpers { } } - @Override public int productArity() { + @Override + public int productArity() { return 3; } - @Override public boolean canEqual(Object that) { + @Override + public boolean canEqual(Object that) { return (that instanceof DecodeUsingBeamCoder); } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) { return true; } @@ -308,10 +344,13 @@ public class EncoderHelpers { return false; } DecodeUsingBeamCoder<?> that = (DecodeUsingBeamCoder<?>) o; - return child.equals(that.child) && classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); + return child.equals(that.child) + && classTag.equals(that.classTag) + && beamCoder.equals(that.beamCoder); } - @Override public int hashCode() { + @Override + public int hashCode() { return Objects.hash(super.hashCode(), child, classTag, beamCoder); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java index c6b8631..8327fd8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark.structuredstreaming.utils; import static org.junit.Assert.assertEquals; @@ -12,22 +29,23 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +/** Test of the wrapping of Beam Coders as Spark ExpressionEncoders. */ @RunWith(JUnit4.class) -/** - * Test of the wrapping of Beam Coders as Spark ExpressionEncoders. - */ public class EncodersTest { @Test public void beamCoderToSparkEncoderTest() { - SparkSession sparkSession = SparkSession.builder().appName("beamCoderToSparkEncoderTest") - .master("local[4]").getOrCreate(); + SparkSession sparkSession = + SparkSession.builder() + .appName("beamCoderToSparkEncoderTest") + .master("local[4]") + .getOrCreate(); List<Integer> data = new ArrayList<>(); data.add(1); data.add(2); data.add(3); - Dataset<Integer> dataset = sparkSession - .createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); + Dataset<Integer> dataset = + sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); List<Integer> results = dataset.collectAsList(); assertEquals(data, results); }
