This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit ebbab698a32c2c3b721f21a9805ad99927246f22 Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Dec 7 12:08:51 2018 +0100 Apply spotless --- .../translation/TranslationContext.java | 7 -- .../batch/ReadSourceTranslatorBatch.java | 4 - .../translation/io/DatasetSource.java | 109 +++++++++++++-------- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 52ed11f..0f2493d 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -33,18 +33,11 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.ForeachWriter; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.execution.datasources.DataSource; -import org.apache.spark.sql.execution.streaming.Source; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.streaming.StreamingQueryException; /** diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 05dc374..63f2fdf 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -19,14 +19,12 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.IOException; import org.apache.beam.runners.core.construction.ReadTranslation; -import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.sql.Dataset; @@ -57,6 +55,4 @@ class ReadSourceTranslatorBatch<T> context.putDataset(output, dataset); } - - } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java index 60bdab6..f230a70 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark.structuredstreaming.translation.io; import static com.google.common.base.Preconditions.checkArgument; @@ -25,8 +42,8 @@ import org.apache.spark.sql.types.StructType; /** * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming - * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. - * This class is just a mix-in. + * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This + * class is just a mix-in. */ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport { @@ -41,79 +58,87 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport { this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); this.bundleSize = context.getOptions().getBundleSize(); - } - @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, - String checkpointLocation, DataSourceOptions options) { + @Override + public MicroBatchReader createMicroBatchReader( + Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) { return new DatasetMicroBatchReader(schema, checkpointLocation, options); } - /** - * This class can be mapped to Beam {@link BoundedSource}. - */ + /** This class can be mapped to Beam {@link BoundedSource}. */ private class DatasetMicroBatchReader implements MicroBatchReader { private Optional<StructType> schema; private String checkpointLocation; private DataSourceOptions options; - private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation, - DataSourceOptions options) { + private DatasetMicroBatchReader( + Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) { //TODO deal with schema and options } - @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) { + @Override + public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) { //TODO extension point for SDF } - @Override public Offset getStartOffset() { + @Override + public Offset getStartOffset() { //TODO extension point for SDF return null; } - @Override public Offset getEndOffset() { + @Override + public Offset getEndOffset() { //TODO extension point for SDF return null; } - @Override public Offset deserializeOffset(String json) { + @Override + public Offset deserializeOffset(String json) { //TODO extension point for SDF return null; } - @Override public void commit(Offset end) { + @Override + public void commit(Offset end) { //TODO no more to read after end Offset } - @Override public void stop() { - } + @Override + public void stop() {} - @Override public StructType readSchema() { + @Override + public StructType readSchema() { return null; } - @Override public List<InputPartition<InternalRow>> planInputPartitions() { + @Override + public List<InputPartition<InternalRow>> planInputPartitions() { List<InputPartition<InternalRow>> result = new ArrayList<>(); long desiredSizeBytes; SparkPipelineOptions options = context.getOptions(); try { - desiredSizeBytes = (bundleSize == null) ? - source.getEstimatedSizeBytes(options) / numPartitions : - bundleSize; + desiredSizeBytes = + (bundleSize == null) + ? source.getEstimatedSizeBytes(options) / numPartitions + : bundleSize; List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options); for (BoundedSource<T> source : sources) { - result.add(new InputPartition<InternalRow>() { - - @Override public InputPartitionReader<InternalRow> createPartitionReader() { - BoundedReader<T> reader = null; - try { - reader = source.createReader(options); - } catch (IOException e) { - } - return new DatasetMicroBatchPartitionReader(reader); - } - }); + result.add( + new InputPartition<InternalRow>() { + + @Override + public InputPartitionReader<InternalRow> createPartitionReader() { + BoundedReader<T> reader = null; + try { + reader = source.createReader(options); + } catch (IOException e) { + } + return new DatasetMicroBatchPartitionReader(reader); + } + }); } return result; @@ -122,12 +147,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport { } return result; } - } - /** - * This class can be mapped to Beam {@link BoundedReader} - */ + /** This class can be mapped to Beam {@link BoundedReader} */ private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> { BoundedReader<T> reader; @@ -140,7 +162,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport { this.closed = false; } - @Override public boolean next() throws IOException { + @Override + public boolean next() throws IOException { if (!started) { started = true; return reader.start(); @@ -149,13 +172,17 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport { } } - @Override public InternalRow get() { + @Override + public InternalRow get() { List<Object> list = new ArrayList<>(); - list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp())); + list.add( + WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp())); return InternalRow.apply(asScalaBuffer(list).toList()); } - @Override public void close() throws IOException { + @Override + public void close() throws IOException { closed = true; reader.close(); }
