Flink sink implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc847a95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc847a95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc847a95 Branch: refs/heads/master Commit: bc847a9582447372461c5cf35450ba4a4c3d490d Parents: 4fd9d74 Author: Maximilian Michels <[email protected]> Authored: Fri Apr 22 12:33:26 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Apr 29 17:58:00 2016 +0200 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 33 +++- .../streaming/io/UnboundedFlinkSink.java | 175 +++++++++++++++++++ 2 files changed, 204 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 927c3a2..db24f9d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -26,13 +26,16 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupBy import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -64,12 +67,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; -import org.apache.flink.streaming.api.functions.TimestampAssigner; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; -import org.apache.kafka.common.utils.Time; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +103,9 @@ public class FlinkStreamingTransformTranslators { TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + + TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); @@ -193,6 +195,29 @@ public class FlinkStreamingTransformTranslators { } } + private static class WriteSinkStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> { + + @Override + public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) { + String name = transform.getName(); + PValue input = context.getInput(transform); + + Sink<T> sink = transform.getSink(); + if (!(sink instanceof UnboundedFlinkSink)) { + throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported."); + } + + DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input); + + inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() { + @Override + public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception { + out.collect(value.getValue()); + } + }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name); + } + } + private static class BoundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java new file mode 100644 index 0000000..77c195a --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import com.google.common.base.Preconditions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.List; + +/** + * A wrapper translating Flink sinks implementing the {@link SinkFunction} interface, into + * unbounded Beam sinks (see {@link UnboundedSource}). + * */ +public class UnboundedFlinkSink<T> extends Sink<T> { + + /* The Flink sink function */ + private final SinkFunction<T> flinkSink; + + private UnboundedFlinkSink(SinkFunction<T> flinkSink) { + this.flinkSink = flinkSink; + } + + public SinkFunction<T> getFlinkSource() { + return this.flinkSink; + } + + @Override + public void validate(PipelineOptions options) { + } + + @Override + public WriteOperation<T, ?> createWriteOperation(PipelineOptions options) { + return new WriteOperation<T, Object>() { + @Override + public void initialize(PipelineOptions options) throws Exception { + + } + + @Override + public void finalize(Iterable<Object> writerResults, PipelineOptions options) throws Exception { + + } + + @Override + public Coder<Object> getWriterResultCoder() { + return new Coder<Object>() { + @Override + public void encode(Object value, OutputStream outStream, Context context) throws CoderException, IOException { + + } + + @Override + public Object decode(InputStream inStream, Context context) throws CoderException, IOException { + return null; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return null; + } + + @Override + public CloudObject asCloudObject() { + return null; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + + } + + @Override + public boolean consistentWithEquals() { + return false; + } + + @Override + public Object structuralValue(Object value) throws Exception { + return null; + } + + @Override + public boolean isRegisterByteSizeObserverCheap(Object value, Context context) { + return false; + } + + @Override + public void registerByteSizeObserver(Object value, ElementByteSizeObserver observer, Context context) throws Exception { + + } + + @Override + public String getEncodingId() { + return null; + } + + @Override + public Collection<String> getAllowedEncodings() { + return null; + } + }; + } + + @Override + public Writer<T, Object> createWriter(PipelineOptions options) throws Exception { + return new Writer<T, Object>() { + @Override + public void open(String uId) throws Exception { + + } + + @Override + public void write(T value) throws Exception { + + } + + @Override + public Object close() throws Exception { + return null; + } + + @Override + public WriteOperation<T, Object> getWriteOperation() { + return null; + } + + }; + } + + @Override + public Sink<T> getSink() { + return UnboundedFlinkSink.this; + } + }; + } + + /** + * Creates a Flink sink to write to using the Write API. + * @param flinkSink The Flink sink, e.g. FlinkKafkaProducer09 + * @param <T> The input type of the sink + * @return A Beam sink wrapping a Flink sink + */ + public static <T> Sink<T> of(SinkFunction<T> flinkSink) { + return new UnboundedFlinkSink<>(flinkSink); + } +}
