http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java new file mode 100644 index 0000000..0a0e301 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming; + +import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; + +public class FlinkGroupByKeyWrapper { + + /** + * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement + * multiple interfaces. + */ + private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> { + } + + public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) { + final Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder); + + return inputDataStream.keyBy( + new KeySelectorWithQueryableResultType<K, V>() { + + @Override + public K getKey(WindowedValue<KV<K, V>> value) throws Exception { + return value.getValue().getKey(); + } + + @Override + public TypeInformation<K> getProducedType() { + return keyTypeInfo; + } + }); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java new file mode 100644 index 0000000..200c397 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java @@ -0,0 +1,72 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingInternals; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.util.Map; + +public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> { + + private final TupleTag<?> mainTag; + private final Map<TupleTag<?>, Integer> outputLabels; + + public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { + super(options, windowingStrategy, doFn); + this.mainTag = Preconditions.checkNotNull(mainTag); + this.outputLabels = Preconditions.checkNotNull(tagsToLabels); + } + + @Override + public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) { + checkTimestamp(inElement, timestamp); + Integer index = outputLabels.get(mainTag); + collector.collect(makeWindowedValue( + new RawUnionValue(index, output), + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + + @Override + public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) { + checkTimestamp(inElement, timestamp); + Integer index = outputLabels.get(tag); + if (index != null) { + collector.collect(makeWindowedValue( + new RawUnionValue(index, output), + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + } + + @Override + public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) { + throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " + + "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " + + "is not available in this class."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java new file mode 100644 index 0000000..18d4249 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java @@ -0,0 +1,89 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.util.state.StateInternals; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.*; + +public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { + + public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { + super(options, windowingStrategy, doFn); + } + + @Override + public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) { + checkTimestamp(inElement, timestamp); + collector.collect(makeWindowedValue( + output, + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + + @Override + public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() not not available in ParDo.Bound()."); + } + + @Override + public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) { + return new WindowingInternals<IN, OUT>() { + @Override + public StateInternals stateInternals() { + throw new NullPointerException("StateInternals are not available for ParDo.Bound()."); + } + + @Override + public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + collector.collect(makeWindowedValue(output, timestamp, windows, pane)); + } + + @Override + public TimerInternals timerInternals() { + throw new NullPointerException("TimeInternals are not available for ParDo.Bound()."); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return inElement.getWindows(); + } + + @Override + public PaneInfo pane() { + return inElement.getPane(); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode."); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java new file mode 100644 index 0000000..17e0746 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io; + +import com.dataartisans.flink.dataflow.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import javax.annotation.Nullable; +import java.util.List; + +public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> { + + private final PipelineOptions options; + private final RichParallelSourceFunction<T> flinkSource; + + public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) { + if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + options = Preconditions.checkNotNull(pipelineOptions); + flinkSource = Preconditions.checkNotNull(source); + validate(); + } + + public RichParallelSourceFunction<T> getFlinkSource() { + return this.flinkSource; + } + + @Override + public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + + @Override + public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + + @Nullable + @Override + public Coder<C> getCheckpointMarkCoder() { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + + + @Override + public void validate() { + Preconditions.checkNotNull(options); + Preconditions.checkNotNull(flinkSource); + if(!options.getRunner().equals(FlinkPipelineRunner.class)) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java new file mode 100644 index 0000000..2b0d6dc --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -0,0 +1,228 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import static com.google.common.base.Preconditions.checkArgument; + +public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> { + + private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of(); + + private static final long serialVersionUID = 1L; + + private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500; + + private static final int CONNECTION_TIMEOUT_TIME = 0; + + private final String hostname; + private final int port; + private final char delimiter; + private final long maxNumRetries; + private final long delayBetweenRetries; + + public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) { + this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); + } + + public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) { + this.hostname = hostname; + this.port = port; + this.delimiter = delimiter; + this.maxNumRetries = maxNumRetries; + this.delayBetweenRetries = delayBetweenRetries; + } + + public String getHostname() { + return this.hostname; + } + + public int getPort() { + return this.port; + } + + public char getDelimiter() { + return this.delimiter; + } + + public long getMaxNumRetries() { + return this.maxNumRetries; + } + + public long getDelayBetweenRetries() { + return this.delayBetweenRetries; + } + + @Override + public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.<UnboundedSource<String, C>>singletonList(this); + } + + @Override + public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) { + return new UnboundedSocketReader(this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + // Flink and Dataflow have different checkpointing mechanisms. + // In our case we do not need a coder. + return null; + } + + @Override + public void validate() { + checkArgument(port > 0 && port < 65536, "port is out of range"); + checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); + checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive"); + } + + @Override + public Coder getDefaultOutputCoder() { + return DEFAULT_SOCKET_CODER; + } + + public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable { + + private static final long serialVersionUID = 7526472295622776147L; + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class); + + private final UnboundedSocketSource source; + + private Socket socket; + private BufferedReader reader; + + private boolean isRunning; + + private String currentRecord; + + public UnboundedSocketReader(UnboundedSocketSource source) { + this.source = source; + } + + private void openConnection() throws IOException { + this.socket = new Socket(); + this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME); + this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); + this.isRunning = true; + } + + @Override + public boolean start() throws IOException { + int attempt = 0; + while (!isRunning) { + try { + openConnection(); + LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort()); + + return advance(); + } catch (IOException e) { + LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs..."); + + if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) { + try { + Thread.sleep(this.source.getDelayBetweenRetries()); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + this.isRunning = false; + break; + } + } + } + LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort()); + return false; + } + + @Override + public boolean advance() throws IOException { + final StringBuilder buffer = new StringBuilder(); + int data; + while (isRunning && (data = reader.read()) != -1) { + // check if the string is complete + if (data != this.source.getDelimiter()) { + buffer.append((char) data); + } else { + if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') { + buffer.setLength(buffer.length() - 1); + } + this.currentRecord = buffer.toString(); + buffer.setLength(0); + return true; + } + } + return false; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return new byte[0]; + } + + @Override + public String getCurrent() throws NoSuchElementException { + return this.currentRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + this.reader.close(); + this.socket.close(); + this.isRunning = false; + LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + "."); + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<String, ?> getCurrentSource() { + return this.source; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java new file mode 100644 index 0000000..3e248a6 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -0,0 +1,120 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io; + +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.joda.time.Instant; + +import java.util.Collection; + +public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements EventTimeSourceFunction<WindowedValue<T>>, Triggerable { + + private final String name; + private final UnboundedSource.UnboundedReader<T> reader; + + private StreamingRuntimeContext runtime = null; + private StreamSource.ManualWatermarkContext<T> context = null; + + private volatile boolean isRunning = false; + + public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) { + this.name = transform.getName(); + this.reader = transform.getSource().createReader(options, null); + } + + public String getName() { + return this.name; + } + + WindowedValue<T> makeWindowedValue( + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + } + + @Override + public void run(SourceContext<WindowedValue<T>> ctx) throws Exception { + if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { + throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " + + "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source."); + } + + context = (StreamSource.ManualWatermarkContext<T>) ctx; + runtime = (StreamingRuntimeContext) getRuntimeContext(); + + this.isRunning = reader.start(); + setNextWatermarkTimer(this.runtime); + + while (isRunning) { + + // get it and its timestamp from the source + T item = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + + long milliseconds = timestamp.getMillis(); + + // write it to the output collector + synchronized (ctx.getCheckpointLock()) { + ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, PaneInfo.NO_FIRING), milliseconds); + } + + // try to go to the next record + this.isRunning = reader.advance(); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void trigger(long timestamp) throws Exception { + if (this.isRunning) { + synchronized (context.getCheckpointLock()) { + long watermarkMillis = this.reader.getWatermark().getMillis(); + context.emitWatermark(new Watermark(watermarkMillis)); + } + setNextWatermarkTimer(this.runtime); + } + } + + private void setNextWatermarkTimer(StreamingRuntimeContext runtime) { + if (this.isRunning) { + long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval(); + long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval); + runtime.registerTimer(timeToNextWatermark, this); + } + } + + private long getTimeToNextWaternark(long watermarkInterval) { + return System.currentTimeMillis() + watermarkInterval; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java new file mode 100644 index 0000000..4401eb3 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java @@ -0,0 +1,139 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; + +import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.TimerOrElement; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.Serializable; + +public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable { + + private TimerOrElement<WindowedValue<KV<K, VIN>>> element; + + private Instant currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + + public TimerOrElement<WindowedValue<KV<K, VIN>>> getElement() { + return this.element; + } + + public void setElement(TimerOrElement<WindowedValue<KV<K, VIN>>> value) { + this.element = value; + } + + public void setCurrentWatermark(Instant watermark) { + checkIfValidWatermark(watermark); + this.currentWatermark = watermark; + } + + private void setCurrentWatermarkAfterRecovery(Instant watermark) { + if(!currentWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { + throw new RuntimeException("Explicitly setting the watermark is only allowed on " + + "initialization after recovery from a node failure. Apparently this is not " + + "the case here as the watermark is already set."); + } + this.currentWatermark = watermark; + } + + @Override + public void setTimer(com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData timerKey) { + K key = element.isTimer() ? (K) element.key() : element.element().getValue().getKey(); + registerTimer(key, timerKey); + } + + protected abstract void registerTimer(K key, TimerData timerKey); + + @Override + public void deleteTimer(com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData timerKey) { + K key = element.isTimer() ? (K) element.key() : element.element().getValue().getKey(); + unregisterTimer(key, timerKey); + } + + protected abstract void unregisterTimer(K key, TimerData timerKey); + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Override + public Instant currentWatermarkTime() { + return this.currentWatermark; + } + + private void checkIfValidWatermark(Instant newWatermark) { + if (currentWatermark.isAfter(newWatermark)) { + throw new IllegalArgumentException(String.format( + "Cannot set current watermark to %s. Newer watermarks " + + "must be no earlier than the current one (%s).", + newWatermark, this.currentWatermark)); + } + } + + public void encodeTimerInternals(DoFn.ProcessContext context, + StateCheckpointWriter writer, + KvCoder<K, VIN> kvCoder, + Coder<? extends BoundedWindow> windowCoder) throws IOException { + if (context == null) { + throw new RuntimeException("The Context has not been initialized."); + } + + if (element != null && !element.isTimer()) { + // create the element coder + WindowedValue.WindowedValueCoder<KV<K, VIN>> elementCoder = WindowedValue + .getFullCoder(kvCoder, windowCoder); + + CoderTypeSerializer<WindowedValue<KV<K, VIN>>> serializer = + new CoderTypeSerializer<>(elementCoder); + + writer.writeByte((byte) 1); + writer.serializeObject(element.element(), serializer); + } else { + // just setting a flag to 0, meaning that there is no value. + writer.writeByte((byte) 0); + } + writer.setTimestamp(currentWatermark); + } + + public void restoreTimerInternals(StateCheckpointReader reader, + KvCoder<K, VIN> kvCoder, + Coder<? extends BoundedWindow> windowCoder) throws IOException { + + boolean isSet = (reader.getByte() == (byte) 1); + if (!isSet) { + this.element = null; + } else { + WindowedValue.WindowedValueCoder<KV<K, VIN>> elementCoder = WindowedValue + .getFullCoder(kvCoder, windowCoder); + + CoderTypeSerializer<WindowedValue<KV<K, VIN>>> serializer = + new CoderTypeSerializer<>(elementCoder); + + WindowedValue<KV<K, VIN>> elem = reader.deserializeObject(serializer); + this.element = TimerOrElement.element(elem); + } + setCurrentWatermarkAfterRecovery(reader.getTimestamp()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java new file mode 100644 index 0000000..03b8bb5 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -0,0 +1,533 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.state.*; +import com.google.protobuf.ByteString; +import org.apache.flink.util.InstantiationUtil; +import org.joda.time.Instant; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.*; + +public class FlinkStateInternals<K> extends MergingStateInternals { + + private final K key; + + private final Coder<K> keyCoder; + + private final Combine.KeyedCombineFn<K, ?, ?, ?> combineFn; + + private final Coder<? extends BoundedWindow> windowCoder; + + private Instant watermarkHoldAccessor; + + public FlinkStateInternals(K key, + Coder<K> keyCoder, + Coder<? extends BoundedWindow> windowCoder, + Combine.KeyedCombineFn<K, ?, ?, ?> combineFn) { + this.key = key; + this.combineFn = combineFn; + this.windowCoder = windowCoder; + this.keyCoder = keyCoder; + } + + public Instant getWatermarkHold() { + return watermarkHoldAccessor; + } + + /** + * This is the interface state has to implement in order for it to be fault tolerant when + * executed by the FlinkPipelineRunner. + */ + private interface CheckpointableIF { + + boolean shouldPersist(); + + void persistState(StateCheckpointWriter checkpointBuilder) throws IOException; + } + + protected final StateTable inMemoryState = new StateTable() { + + @Override + protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace) { + return new StateTag.StateBinder() { + + @Override + public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) { + return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder); + } + + @Override + public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) { + return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> CombiningValueStateInternal<InputT, AccumT, OutputT> bindCombiningValue( + StateTag<CombiningValueStateInternal<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + return new FlinkInMemoryCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder); + } + + @Override + public <T> WatermarkStateInternal bindWatermark(StateTag<WatermarkStateInternal> address) { + return new FlinkWatermarkStateInternalImpl(encodeKey(namespace, address)); + } + }; + } + }; + + @Override + public <T extends State> T state(StateNamespace namespace, StateTag<T> address) { + return inMemoryState.get(namespace, address); + } + + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + checkpointBuilder.writeInt(getNoOfElements()); + + for (State location : inMemoryState.values()) { + if (!(location instanceof CheckpointableIF)) { + throw new IllegalStateException(String.format( + "%s wasn't created by %s -- unable to persist it", + location.getClass().getSimpleName(), + getClass().getSimpleName())); + } + ((CheckpointableIF) location).persistState(checkpointBuilder); + } + } + + public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader) + throws IOException, ClassNotFoundException { + + // the number of elements to read. + int noOfElements = checkpointReader.getInt(); + for (int i = 0; i < noOfElements; i++) { + decodeState(checkpointReader, loader); + } + } + + /** + * We remove the first character which encodes the type of the stateTag ('s' for system + * and 'u' for user). For more details check out the source of + * {@link StateTags.StateTagBase#getId()}. + */ + private void decodeState(StateCheckpointReader reader, ClassLoader loader) + throws IOException, ClassNotFoundException { + + StateType stateItemType = StateType.deserialize(reader); + ByteString stateKey = reader.getTag(); + + // first decode the namespace and the tagId... + String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+"); + if (namespaceAndTag.length != 2) { + throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + "."); + } + StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder); + + // ... decide if it is a system or user stateTag... + char ownerTag = namespaceAndTag[1].charAt(0); + if (ownerTag != 's' && ownerTag != 'u') { + throw new RuntimeException("Invalid StateTag name."); + } + boolean isSystemTag = ownerTag == 's'; + String tagId = namespaceAndTag[1].substring(1); + + // ...then decode the coder (if there is one)... + Coder coder = null; + if (!stateItemType.equals(StateType.WATERMARK)) { + ByteString coderBytes = reader.getData(); + coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader); + } + + //... and finally, depending on the type of the state being decoded, + // 1) create the adequate stateTag, + // 2) create the state container, + // 3) restore the actual content. + switch (stateItemType) { + case VALUE: { + StateTag stateTag = StateTags.value(tagId, coder); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag); + value.restoreState(reader); + break; + } + case WATERMARK: { + StateTag<WatermarkStateInternal> stateTag = StateTags.watermarkStateInternal(tagId); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + FlinkWatermarkStateInternalImpl watermark = (FlinkWatermarkStateInternalImpl) inMemoryState.get(namespace, stateTag); + watermark.restoreState(reader); + break; + } + case LIST: { + StateTag stateTag = StateTags.bag(tagId, coder); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag); + bag.restoreState(reader); + break; + } + case ACCUMULATOR: { + StateTag stateTag = StateTags.combiningValue(tagId, coder, combineFn.forKey(this.key, keyCoder)); + stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; + FlinkInMemoryCombiningValue<?, ?, ?> combiningValue = (FlinkInMemoryCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag); + combiningValue.restoreState(reader); + break; + } + default: + throw new RuntimeException("Unknown State Type " + stateItemType + "."); + } + } + + private ByteString encodeKey(StateNamespace namespace, StateTag<?> address) { + return ByteString.copyFromUtf8(namespace.stringKey() + "+" + address.getId()); + } + + private int getNoOfElements() { + int noOfElements = 0; + for (State state : inMemoryState.values()) { + if (!(state instanceof CheckpointableIF)) { + throw new RuntimeException("State Implementations used by the " + + "Flink Dataflow Runner should implement the CheckpointableIF interface."); + } + + if (((CheckpointableIF) state).shouldPersist()) { + noOfElements++; + } + } + return noOfElements; + } + + private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF { + + private final ByteString stateKey; + private final Coder<T> elemCoder; + + private T value = null; + + public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) { + this.stateKey = stateKey; + this.elemCoder = elemCoder; + } + + @Override + public void clear() { + value = null; + } + + @Override + public StateContents<T> get() { + return new StateContents<T>() { + @Override + public T read() { + return value; + } + }; + } + + @Override + public void set(T input) { + this.value = input; + } + + @Override + public boolean shouldPersist() { + return value != null; + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (value != null) { + + // serialize the coder. + byte[] coder = InstantiationUtil.serializeObject(elemCoder); + + // encode the value into a ByteString + ByteString.Output stream = ByteString.newOutput(); + elemCoder.encode(value, stream, Coder.Context.OUTER); + ByteString data = stream.toByteString(); + + checkpointBuilder.addValueBuilder() + .setTag(stateKey) + .setData(coder) + .setData(data); + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + ByteString valueContent = checkpointReader.getData(); + T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); + set(outValue); + } + } + + private final class FlinkWatermarkStateInternalImpl + implements WatermarkStateInternal, CheckpointableIF { + + private final ByteString stateKey; + + private Instant minimumHold = null; + + public FlinkWatermarkStateInternalImpl(ByteString stateKey) { + this.stateKey = stateKey; + } + + @Override + public void clear() { + // Even though we're clearing we can't remove this from the in-memory state map, since + // other users may already have a handle on this WatermarkBagInternal. + minimumHold = null; + watermarkHoldAccessor = null; + } + + @Override + public StateContents<Instant> get() { + return new StateContents<Instant>() { + @Override + public Instant read() { + return minimumHold; + } + }; + } + + @Override + public void add(Instant watermarkHold) { + if (minimumHold == null || minimumHold.isAfter(watermarkHold)) { + watermarkHoldAccessor = watermarkHold; + minimumHold = watermarkHold; + } + } + + @Override + public StateContents<Boolean> isEmpty() { + return new StateContents<Boolean>() { + @Override + public Boolean read() { + return minimumHold == null; + } + }; + } + + @Override + public String toString() { + return Objects.toString(minimumHold); + } + + @Override + public boolean shouldPersist() { + return minimumHold != null; + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (minimumHold != null) { + checkpointBuilder.addWatermarkHoldsBuilder() + .setTag(stateKey) + .setTimestamp(minimumHold); + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + Instant watermark = checkpointReader.getTimestamp(); + add(watermark); + } + } + + private final class FlinkInMemoryCombiningValue<InputT, AccumT, OutputT> + implements CombiningValueStateInternal<InputT, AccumT, OutputT>, CheckpointableIF { + + private final ByteString stateKey; + private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; + private final Coder<AccumT> accumCoder; + + private AccumT accum; + private boolean isCleared = true; + + private FlinkInMemoryCombiningValue(ByteString stateKey, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn, + Coder<AccumT> accumCoder) { + Preconditions.checkNotNull(combineFn); + Preconditions.checkNotNull(accumCoder); + + this.stateKey = stateKey; + this.combineFn = combineFn; + this.accumCoder = accumCoder; + accum = combineFn.createAccumulator(); + } + + @Override + public void clear() { + accum = combineFn.createAccumulator(); + isCleared = true; + } + + @Override + public StateContents<OutputT> get() { + return new StateContents<OutputT>() { + @Override + public OutputT read() { + return combineFn.extractOutput(accum); + } + }; + } + + @Override + public void add(InputT input) { + isCleared = false; + accum = combineFn.addInput(accum, input); + } + + @Override + public StateContents<AccumT> getAccum() { + return new StateContents<AccumT>() { + @Override + public AccumT read() { + return accum; + } + }; + } + + @Override + public StateContents<Boolean> isEmpty() { + return new StateContents<Boolean>() { + @Override + public Boolean read() { + return isCleared; + } + }; + } + + @Override + public void addAccum(AccumT accum) { + isCleared = false; + this.accum = combineFn.mergeAccumulators(Arrays.asList(this.accum, accum)); + } + + @Override + public boolean shouldPersist() { + return accum != null; + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (accum != null) { + + // serialize the coder. + byte[] coder = InstantiationUtil.serializeObject(accumCoder); + + // encode the accumulator into a ByteString + ByteString.Output stream = ByteString.newOutput(); + accumCoder.encode(accum, stream, Coder.Context.OUTER); + ByteString data = stream.toByteString(); + + // put the flag that the next serialized element is an accumulator + checkpointBuilder.addAccumulatorBuilder() + .setTag(stateKey) + .setData(coder) + .setData(data); + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + ByteString valueContent = checkpointReader.getData(); + AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); + addAccum(accum); + } + } + + private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF { + private final List<T> contents = new ArrayList<>(); + + private final ByteString stateKey; + private final Coder<T> elemCoder; + + public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) { + this.stateKey = stateKey; + this.elemCoder = elemCoder; + } + + @Override + public void clear() { + contents.clear(); + } + + @Override + public StateContents<Iterable<T>> get() { + return new StateContents<Iterable<T>>() { + @Override + public Iterable<T> read() { + return contents; + } + }; + } + + @Override + public void add(T input) { + contents.add(input); + } + + @Override + public StateContents<Boolean> isEmpty() { + return new StateContents<Boolean>() { + @Override + public Boolean read() { + return contents.isEmpty(); + } + }; + } + + @Override + public boolean shouldPersist() { + return !contents.isEmpty(); + } + + @Override + public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { + if (!contents.isEmpty()) { + // serialize the coder. + byte[] coder = InstantiationUtil.serializeObject(elemCoder); + + checkpointBuilder.addListUpdatesBuilder() + .setTag(stateKey) + .setData(coder) + .writeInt(contents.size()); + + for (T item : contents) { + // encode the element + ByteString.Output stream = ByteString.newOutput(); + elemCoder.encode(item, stream, Coder.Context.OUTER); + ByteString data = stream.toByteString(); + + // add the data to the checkpoint. + checkpointBuilder.setData(data); + } + } + } + + public void restoreState(StateCheckpointReader checkpointReader) throws IOException { + int noOfValues = checkpointReader.getInt(); + for (int j = 0; j < noOfValues; j++) { + ByteString valueContent = checkpointReader.getData(); + T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); + add(outValue); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java new file mode 100644 index 0000000..ba8ef89 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java @@ -0,0 +1,89 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; + +import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer; +import com.google.protobuf.ByteString; +import org.apache.flink.core.memory.DataInputView; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class StateCheckpointReader { + + private final DataInputView input; + + public StateCheckpointReader(DataInputView in) { + this.input = in; + } + + public ByteString getTag() throws IOException { + return ByteString.copyFrom(readRawData()); + } + + public String getTagToString() throws IOException { + return input.readUTF(); + } + + public ByteString getData() throws IOException { + return ByteString.copyFrom(readRawData()); + } + + public int getInt() throws IOException { + validate(); + return input.readInt(); + } + + public byte getByte() throws IOException { + validate(); + return input.readByte(); + } + + public Instant getTimestamp() throws IOException { + validate(); + Long watermarkMillis = input.readLong(); + return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis)); + } + + public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException { + return deserializeObject(keySerializer); + } + + public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException { + return objectSerializer.deserialize(input); + } + + ///////// Helper Methods /////// + + private byte[] readRawData() throws IOException { + validate(); + int size = input.readInt(); + + byte[] serData = new byte[size]; + int bytesRead = input.read(serData); + if (bytesRead != size) { + throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream."); + } + return serData; + } + + private void validate() { + if (this.input == null) { + throw new RuntimeException("StateBackend not initialized yet."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java new file mode 100644 index 0000000..6bc8662 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java @@ -0,0 +1,152 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; + +import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.state.StateNamespace; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class StateCheckpointUtils { + + public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals, + StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException { + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + + int noOfKeys = perKeyStateInternals.size(); + writer.writeInt(noOfKeys); + for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) { + K key = keyStatePair.getKey(); + FlinkStateInternals<K> state = keyStatePair.getValue(); + + // encode the key + writer.serializeKey(key, keySerializer); + + // write the associated state + state.persistState(writer); + } + } + + public static <K> Map<K, FlinkStateInternals<K>> decodeState( + StateCheckpointReader reader, + Combine.KeyedCombineFn<K, ?, ?, ?> combineFn, + Coder<K> keyCoder, + Coder<? extends BoundedWindow> windowCoder, + ClassLoader classLoader) throws IOException, ClassNotFoundException { + + int noOfKeys = reader.getInt(); + Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys); + perKeyStateInternals.clear(); + + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + for (int i = 0; i < noOfKeys; i++) { + + // decode the key. + K key = reader.deserializeKey(keySerializer); + + //decode the state associated to the key. + FlinkStateInternals<K> stateForKey = + new FlinkStateInternals<>(key, keyCoder, windowCoder, combineFn); + stateForKey.restoreState(reader, classLoader); + perKeyStateInternals.put(key, stateForKey); + } + return perKeyStateInternals; + } + + ////////////// Encoding/Decoding the Timers //////////////// + + + public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers, + StateCheckpointWriter writer, + Coder<K> keyCoder) throws IOException { + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + + int noOfKeys = allTimers.size(); + writer.writeInt(noOfKeys); + for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) { + K key = timersPerKey.getKey(); + + // encode the key + writer.serializeKey(key, keySerializer); + + // write the associated timers + Set<TimerInternals.TimerData> timers = timersPerKey.getValue(); + encodeTimerDataForKey(writer, timers); + } + } + + public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers( + StateCheckpointReader reader, + Coder<? extends BoundedWindow> windowCoder, + Coder<K> keyCoder) throws IOException { + + int noOfKeys = reader.getInt(); + Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys); + activeTimers.clear(); + + CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); + for (int i = 0; i < noOfKeys; i++) { + + // decode the key. + K key = reader.deserializeKey(keySerializer); + + // decode the associated timers. + Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder); + activeTimers.put(key, timers); + } + return activeTimers; + } + + private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException { + // encode timers + writer.writeInt(timers.size()); + for (TimerInternals.TimerData timer : timers) { + String stringKey = timer.getNamespace().stringKey(); + + writer.setTag(stringKey); + writer.setTimestamp(timer.getTimestamp()); + writer.writeInt(timer.getDomain().ordinal()); + } + } + + private static Set<TimerInternals.TimerData> decodeTimerDataForKey( + StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException { + + // decode the timers: first their number and then the content itself. + int noOfTimers = reader.getInt(); + Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers); + for (int i = 0; i < noOfTimers; i++) { + String stringKey = reader.getTagToString(); + Instant instant = reader.getTimestamp(); + TimeDomain domain = TimeDomain.values()[reader.getInt()]; + + StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder); + timers.add(TimerInternals.TimerData.of(namespace, instant, domain)); + } + return timers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java new file mode 100644 index 0000000..7201112 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java @@ -0,0 +1,127 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; + +import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer; +import com.google.protobuf.ByteString; +import org.apache.flink.runtime.state.StateBackend; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class StateCheckpointWriter { + + private final StateBackend.CheckpointStateOutputView output; + + public static StateCheckpointWriter create(StateBackend.CheckpointStateOutputView output) { + return new StateCheckpointWriter(output); + } + + private StateCheckpointWriter(StateBackend.CheckpointStateOutputView output) { + this.output = output; + } + + ///////// Creating the serialized versions of the different types of state held by dataflow /////// + + public StateCheckpointWriter addValueBuilder() throws IOException { + validate(); + StateType.serialize(StateType.VALUE, this); + return this; + } + + public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException { + validate(); + StateType.serialize(StateType.WATERMARK, this); + return this; + } + + public StateCheckpointWriter addListUpdatesBuilder() throws IOException { + validate(); + StateType.serialize(StateType.LIST, this); + return this; + } + + public StateCheckpointWriter addAccumulatorBuilder() throws IOException { + validate(); + StateType.serialize(StateType.ACCUMULATOR, this); + return this; + } + + ///////// Setting the tag for a given state element /////// + + public StateCheckpointWriter setTag(ByteString stateKey) throws IOException { + return writeData(stateKey.toByteArray()); + } + + public StateCheckpointWriter setTag(String stateKey) throws IOException { + output.writeUTF(stateKey); + return this; + } + + + public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException { + return serializeObject(key, keySerializer); + } + + public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException { + objectSerializer.serialize(object, output); + return this; + } + + ///////// Write the actual serialized data ////////// + + public StateCheckpointWriter setData(ByteString data) throws IOException { + return writeData(data.toByteArray()); + } + + public StateCheckpointWriter setData(byte[] data) throws IOException { + return writeData(data); + } + + public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException { + validate(); + output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis())); + return this; + } + + public StateCheckpointWriter writeInt(int number) throws IOException { + validate(); + output.writeInt(number); + return this; + } + + public StateCheckpointWriter writeByte(byte b) throws IOException { + validate(); + output.writeByte(b); + return this; + } + + ///////// Helper Methods /////// + + private StateCheckpointWriter writeData(byte[] data) throws IOException { + validate(); + output.writeInt(data.length); + output.write(data); + return this; + } + + private void validate() { + if (this.output == null) { + throw new RuntimeException("StateBackend not initialized yet."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java new file mode 100644 index 0000000..11446ea --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java @@ -0,0 +1,67 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; + +import java.io.IOException; + +public enum StateType { + + VALUE(0), + + WATERMARK(1), + + LIST(2), + + ACCUMULATOR(3); + + private final int numVal; + + StateType(int value) { + this.numVal = value; + } + + public static void serialize(StateType type, StateCheckpointWriter output) throws IOException { + if (output == null) { + throw new IllegalArgumentException("Cannot write to a null output."); + } + + if(type.numVal < 0 || type.numVal > 3) { + throw new RuntimeException("Unknown State Type " + type + "."); + } + + output.writeByte((byte) type.numVal); + } + + public static StateType deserialize(StateCheckpointReader input) throws IOException { + if (input == null) { + throw new IllegalArgumentException("Cannot read from a null input."); + } + + int typeInt = (int) input.getByte(); + if(typeInt < 0 || typeInt > 3) { + throw new RuntimeException("Unknown State Type " + typeInt + "."); + } + + StateType resultType = null; + for(StateType st: values()) { + if(st.numVal == typeInt) { + resultType = st; + break; + } + } + return resultType; + } +}
