Remove Some Code leftover from Earlier Refactoring This only removes unused parts of code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fe7010b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fe7010b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fe7010b Branch: refs/heads/master Commit: 4fe7010bdb7692914fc6c2821c95caea0cab770d Parents: 36a27f5 Author: Aljoscha Krettek <[email protected]> Authored: Sun May 29 08:04:39 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon May 30 14:23:19 2016 +0200 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 5 - .../FlinkBatchTransformTranslators.java | 131 +------------------ .../translation/wrappers/SinkOutputFormat.java | 97 -------------- 3 files changed, 3 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 757ac9c..b60cba1 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -57,11 +57,6 @@ <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>${flink.version}</version> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro_2.10</artifactId> - <version>${flink.version}</version> - </dependency> <!-- Beam --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 8358807..200e4af 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFun import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation; -import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat; import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -36,11 +35,8 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.DoFn; @@ -69,27 +65,19 @@ import com.google.common.collect.Maps; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.GroupCombineOperator; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.Grouping; -import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.SingleInputUdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; -import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -100,7 +88,7 @@ import java.util.Map; * Translators for transforming {@link PTransform PTransforms} to * Flink {@link DataSet DataSets}. */ -public class FlinkBatchTransformTranslators { +class FlinkBatchTransformTranslators { // -------------------------------------------------------------------------------------------- // Transform Translator Registry @@ -128,7 +116,7 @@ public class FlinkBatchTransformTranslators { } - public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator( + static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator( PTransform<?, ?> transform) { return TRANSLATORS.get(transform.getClass()); } @@ -154,119 +142,6 @@ public class FlinkBatchTransformTranslators { } } - private static class WriteSinkTranslatorBatch<T> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> { - - @Override - public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) { - String name = transform.getName(); - PValue input = context.getInput(transform); - DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); - - inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())) - .name(name); - } - } - - private static class AvroIOWriteTranslatorBatch<T> implements - FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class); - - - @Override - public void translateNode( - AvroIO.Write.Bound<T> transform, - FlinkBatchTranslationContext context) { - DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn( - "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", - filenameSuffix); - LOG.warn( - "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", - shardNameTemplate); - - // This is super hacky, but unfortunately we cannot get the type otherwise - Class<T> extractedAvroType; - try { - Field typeField = transform.getClass().getDeclaredField("type"); - typeField.setAccessible(true); - @SuppressWarnings("unchecked") - Class<T> avroType = (Class<T>) typeField.get(transform); - extractedAvroType = avroType; - } catch (NoSuchFieldException | IllegalAccessException e) { - // we know that the field is there and it is accessible - throw new RuntimeException("Could not access type from AvroIO.Bound", e); - } - - MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map( - new MapFunction<WindowedValue<T>, T>() { - @Override - public T map(WindowedValue<T> value) throws Exception { - return value.getValue(); - } - }).returns(new CoderTypeInformation<>(context.getInput(transform).getCoder())); - - - DataSink<T> dataSink = valueStream.output( - new AvroOutputFormat<>(new Path(filenamePrefix), extractedAvroType)); - - if (numShards > 0) { - dataSink.setParallelism(numShards); - } - } - } - - private static class TextIOWriteTranslatorBatch<T> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class); - - @Override - public void translateNode( - TextIO.Write.Bound<T> transform, - FlinkBatchTranslationContext context) { - PValue input = context.getInput(transform); - DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - boolean needsValidation = transform.needsValidation(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn( - "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", - needsValidation); - LOG.warn( - "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", - filenameSuffix); - LOG.warn( - "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", - shardNameTemplate); - - MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map( - new MapFunction<WindowedValue<T>, T>() { - @Override - public T map(WindowedValue<T> value) throws Exception { - return value.getValue(); - } - }).returns(new CoderTypeInformation<>(transform.getCoder())); - - DataSink<T> dataSink = valueStream.writeAsText(filenamePrefix); - - if (numShards > 0) { - dataSink.setParallelism(numShards); - } - } - } - private static class WindowBoundTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> { @@ -431,7 +306,7 @@ public class FlinkBatchTransformTranslators { private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { @Override public List<T> createAccumulator() { - return new ArrayList<T>(); + return new ArrayList<>(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java deleted file mode 100644 index c0a7132..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink.translation.wrappers; - -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.io.Sink; -import org.apache.beam.sdk.io.Write; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.WindowedValue; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.AbstractID; - -import java.io.IOException; -import java.lang.reflect.Field; - -/** - * Wrapper for executing a {@link Sink} on Flink as an {@link OutputFormat}. - * - * @param <T> The type of the incoming records. - */ -public class SinkOutputFormat<T> implements OutputFormat<WindowedValue<T>> { - - private final Sink<T> sink; - - private final SerializedPipelineOptions serializedOptions; - - private Sink.WriteOperation<T, ?> writeOperation; - private Sink.Writer<T, ?> writer; - - private AbstractID uid = new AbstractID(); - - public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) { - this.sink = transform.getSink(); - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - } - - @Override - public void configure(Configuration configuration) { - writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions()); - try { - writeOperation.initialize(serializedOptions.getPipelineOptions()); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize the write operation.", e); - } - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - writer = writeOperation.createWriter(serializedOptions.getPipelineOptions()); - } catch (Exception e) { - throw new IOException("Couldn't create writer.", e); - } - try { - writer.open(uid + "-" + String.valueOf(taskNumber)); - } catch (Exception e) { - throw new IOException("Couldn't open writer.", e); - } - } - - @Override - public void writeRecord(WindowedValue<T> record) throws IOException { - try { - writer.write(record.getValue()); - } catch (Exception e) { - throw new IOException("Couldn't write record.", e); - } - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (Exception e) { - throw new IOException("Couldn't close writer.", e); - } - } - -}
