This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 62a87b62a953221ccb465ce83dc2ab095d9d49a4 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Oct 24 11:58:01 2019 +0200 Apply spotless, fix typo and javadoc --- .../batch/GroupByKeyTranslatorBatch.java | 8 ++-- .../batch/WindowAssignTranslatorBatch.java | 6 +-- .../translation/helpers/EncoderHelpers.java | 16 +++---- .../translation/helpers/MultiOuputCoder.java | 51 +++++++++++++++++----- .../translation/helpers/RowHelpers.java | 2 +- .../metrics/sink/SparkMetricsSinkTest.java | 2 +- 6 files changed, 58 insertions(+), 27 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 2970aa7..3ebe477 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -62,8 +62,7 @@ class GroupByKeyTranslatorBatch<K, V> // group by key only Coder<K> keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = - input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( - keyCoder)); + input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable Coder<V> valueCoder = kvCoder.getValueCoder(); @@ -92,8 +91,9 @@ class GroupByKeyTranslatorBatch<K, V> EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); // group also by windows - WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = WindowedValue.FullWindowedValueCoder - .of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), + WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = + WindowedValue.FullWindowedValueCoder.of( + KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), windowingStrategy.getWindowFn().windowCoder()); Dataset<WindowedValue<KV<K, Iterable<V>>>> output = materialized.flatMap( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index 576b914..4ac8a3f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -46,12 +46,12 @@ class WindowAssignTranslatorBatch<T> context.putDataset(output, inputDataset); } else { WindowFn<T, ?> windowFn = assignTransform.getWindowFn(); - WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder - .of(input.getCoder(), windowFn.windowCoder()); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder()); Dataset<WindowedValue<T>> outputDataset = inputDataset.map( WindowingHelpers.assignWindowsMapFunction(windowFn), - EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDataset(output, outputDataset); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a4f0320..2f3bced 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; -import scala.Tuple2; import scala.collection.JavaConversions; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -81,14 +80,15 @@ public class EncoderHelpers { return Encoders.kryo((Class<T>) Object.class); } -/* - */ -/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//* + /* + */ + /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */ + /* - public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() { - return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder()); - } -*/ + public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() { + return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder()); + } + */ /* --------- Bridges from Beam Coders to Spark Encoders diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java index caaea01..82f0e4f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import java.io.IOException; @@ -12,37 +29,51 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import scala.Tuple2; +/** + * Coder to serialize and deserialize {@code}Tuple2<TupleTag<T>, WindowedValue<T>{/@code} to be used + * in spark encoders while applying {@link org.apache.beam.sdk.transforms.DoFn}. + * + * @param <T> type of the elements in the collection + */ public class MultiOuputCoder<T> extends CustomCoder<Tuple2<TupleTag<T>, WindowedValue<T>>> { Coder<TupleTag> tupleTagCoder; Map<TupleTag<?>, Coder<?>> coderMap; Coder<? extends BoundedWindow> windowCoder; - public static MultiOuputCoder of(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) { + public static MultiOuputCoder of( + Coder<TupleTag> tupleTagCoder, + Map<TupleTag<?>, Coder<?>> coderMap, + Coder<? extends BoundedWindow> windowCoder) { return new MultiOuputCoder(tupleTagCoder, coderMap, windowCoder); } - private MultiOuputCoder(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) { + private MultiOuputCoder( + Coder<TupleTag> tupleTagCoder, + Map<TupleTag<?>, Coder<?>> coderMap, + Coder<? extends BoundedWindow> windowCoder) { this.tupleTagCoder = tupleTagCoder; this.coderMap = coderMap; this.windowCoder = windowCoder; } - @Override public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream) + @Override + public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream) throws IOException { TupleTag<T> tupleTag = tuple2._1(); tupleTagCoder.encode(tupleTag, outStream); - Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag); - WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder - .of(valueCoder, windowCoder); + Coder<T> valueCoder = (Coder<T>) coderMap.get(tupleTag); + WindowedValue.FullWindowedValueCoder<T> wvCoder = + WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder); wvCoder.encode(tuple2._2(), outStream); } - @Override public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream) + @Override + public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream) throws CoderException, IOException { TupleTag<T> tupleTag = (TupleTag<T>) tupleTagCoder.decode(inStream); - Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag); - WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder - .of(valueCoder, windowCoder); + Coder<T> valueCoder = (Coder<T>) coderMap.get(tupleTag); + WindowedValue.FullWindowedValueCoder<T> wvCoder = + WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder); WindowedValue<T> wv = wvCoder.decode(inStream); return Tuple2.apply(tupleTag, wv); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java index ac74c29..afb4922 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java @@ -53,7 +53,7 @@ public final class RowHelpers { } /** - * Serialize a windowedValue to bytes using windowed {@link WindowedValue.FullWindowedValueCoder} + * Serialize a windowedValue to bytes using windowedValueCoder {@link WindowedValue.FullWindowedValueCoder} * and stores it an InternalRow. */ public static <T> InternalRow storeWindowedValueInRow( diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index 9d56f0c..de405a4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -41,7 +41,7 @@ import org.junit.rules.ExternalResource; * <p>A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ -@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") +@Ignore("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
