http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index eabc307..7dae0b0 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -44,221 +44,221 @@ import java.util.Collection; * */ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> { - private final DoFn<IN, OUTDF> doFn; - private final WindowingStrategy<?, ?> windowingStrategy; - private transient PipelineOptions options; - - private DoFnProcessContext context; - - public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) { - Preconditions.checkNotNull(options); - Preconditions.checkNotNull(windowingStrategy); - Preconditions.checkNotNull(doFn); - - this.doFn = doFn; - this.options = options; - this.windowingStrategy = windowingStrategy; - } - - private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { - if (this.context == null) { - this.context = new DoFnProcessContext(function, outCollector); - } - } - - @Override - public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception { - this.initContext(doFn, out); - - // for each window the element belongs to, create a new copy here. - Collection<? extends BoundedWindow> windows = value.getWindows(); - if (windows.size() <= 1) { - processElement(value); - } else { - for (BoundedWindow window : windows) { - processElement(WindowedValue.of( - value.getValue(), value.getTimestamp(), window, value.getPane())); - } - } - } - - private void processElement(WindowedValue<IN> value) throws Exception { - this.context.setElement(value); - this.doFn.startBundle(context); - doFn.processElement(context); - this.doFn.finishBundle(context); - } - - private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext { - - private final DoFn<IN, OUTDF> fn; - - protected final Collector<WindowedValue<OUTFL>> collector; - - private WindowedValue<IN> element; - - private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { - function.super(); - super.setupDelegateAggregators(); - - this.fn = function; - this.collector = outCollector; - } - - public void setElement(WindowedValue<IN> value) { - this.element = value; - } - - @Override - public IN element() { - return this.element.getValue(); - } - - @Override - public Instant timestamp() { - return this.element.getTimestamp(); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindow."); - } - - Collection<? extends BoundedWindow> windows = this.element.getWindows(); - if (windows.size() != 1) { - throw new IllegalArgumentException("Each element is expected to belong to 1 window. " + - "This belongs to " + windows.size() + "."); - } - return windows.iterator().next(); - } - - @Override - public PaneInfo pane() { - return this.element.getPane(); - } - - @Override - public WindowingInternals<IN, OUTDF> windowingInternals() { - return windowingInternalsHelper(element, collector); - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new RuntimeException("sideInput() is not supported in Streaming mode."); - } - - @Override - public void output(OUTDF output) { - outputWithTimestamp(output, this.element.getTimestamp()); - } - - @Override - public void outputWithTimestamp(OUTDF output, Instant timestamp) { - outputWithTimestampHelper(element, output, timestamp, collector); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - sideOutputWithTimestamp(tag, output, this.element.getTimestamp()); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutputWithTimestampHelper(element, output, timestamp, collector, tag); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - Accumulator acc = getRuntimeContext().getAccumulator(name); - if (acc != null) { - AccumulatorHelper.compareAccumulatorTypes(name, - SerializableFnAggregatorWrapper.class, acc.getClass()); - return (Aggregator<AggInputT, AggOutputT>) acc; - } - - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = - new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, accumulator); - return accumulator; - } - } - - protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) { - if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) { - throw new IllegalArgumentException(String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", - timestamp, ref.getTimestamp(), - PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()))); - } - } - - protected <T> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - final WindowFn windowFn = windowingStrategy.getWindowFn(); - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - windows = windowFn.assignWindows(windowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - /////////// ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES ///////////////// - - public abstract void outputWithTimestampHelper( - WindowedValue<IN> inElement, - OUTDF output, - Instant timestamp, - Collector<WindowedValue<OUTFL>> outCollector); - - public abstract <T> void sideOutputWithTimestampHelper( - WindowedValue<IN> inElement, - T output, - Instant timestamp, - Collector<WindowedValue<OUTFL>> outCollector, - TupleTag<T> tag); - - public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper( - WindowedValue<IN> inElement, - Collector<WindowedValue<OUTFL>> outCollector); + private final DoFn<IN, OUTDF> doFn; + private final WindowingStrategy<?, ?> windowingStrategy; + private transient PipelineOptions options; + + private DoFnProcessContext context; + + public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) { + Preconditions.checkNotNull(options); + Preconditions.checkNotNull(windowingStrategy); + Preconditions.checkNotNull(doFn); + + this.doFn = doFn; + this.options = options; + this.windowingStrategy = windowingStrategy; + } + + private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { + if (this.context == null) { + this.context = new DoFnProcessContext(function, outCollector); + } + } + + @Override + public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception { + this.initContext(doFn, out); + + // for each window the element belongs to, create a new copy here. + Collection<? extends BoundedWindow> windows = value.getWindows(); + if (windows.size() <= 1) { + processElement(value); + } else { + for (BoundedWindow window : windows) { + processElement(WindowedValue.of( + value.getValue(), value.getTimestamp(), window, value.getPane())); + } + } + } + + private void processElement(WindowedValue<IN> value) throws Exception { + this.context.setElement(value); + this.doFn.startBundle(context); + doFn.processElement(context); + this.doFn.finishBundle(context); + } + + private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext { + + private final DoFn<IN, OUTDF> fn; + + protected final Collector<WindowedValue<OUTFL>> collector; + + private WindowedValue<IN> element; + + private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { + function.super(); + super.setupDelegateAggregators(); + + this.fn = function; + this.collector = outCollector; + } + + public void setElement(WindowedValue<IN> value) { + this.element = value; + } + + @Override + public IN element() { + return this.element.getValue(); + } + + @Override + public Instant timestamp() { + return this.element.getTimestamp(); + } + + @Override + public BoundedWindow window() { + if (!(fn instanceof DoFn.RequiresWindowAccess)) { + throw new UnsupportedOperationException( + "window() is only available in the context of a DoFn marked as RequiresWindow."); + } + + Collection<? extends BoundedWindow> windows = this.element.getWindows(); + if (windows.size() != 1) { + throw new IllegalArgumentException("Each element is expected to belong to 1 window. " + + "This belongs to " + windows.size() + "."); + } + return windows.iterator().next(); + } + + @Override + public PaneInfo pane() { + return this.element.getPane(); + } + + @Override + public WindowingInternals<IN, OUTDF> windowingInternals() { + return windowingInternalsHelper(element, collector); + } + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public void output(OUTDF output) { + outputWithTimestamp(output, this.element.getTimestamp()); + } + + @Override + public void outputWithTimestamp(OUTDF output, Instant timestamp) { + outputWithTimestampHelper(element, output, timestamp, collector); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + sideOutputWithTimestamp(tag, output, this.element.getTimestamp()); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutputWithTimestampHelper(element, output, timestamp, collector, tag); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + Accumulator acc = getRuntimeContext().getAccumulator(name); + if (acc != null) { + AccumulatorHelper.compareAccumulatorTypes(name, + SerializableFnAggregatorWrapper.class, acc.getClass()); + return (Aggregator<AggInputT, AggOutputT>) acc; + } + + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = + new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, accumulator); + return accumulator; + } + } + + protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) { + if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) { + throw new IllegalArgumentException(String.format( + "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + + "timestamp of the current input (%s) minus the allowed skew (%s). See the " + + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", + timestamp, ref.getTimestamp(), + PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()))); + } + } + + protected <T> WindowedValue<T> makeWindowedValue( + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + final WindowFn windowFn = windowingStrategy.getWindowFn(); + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } + + /////////// ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES ///////////////// + + public abstract void outputWithTimestampHelper( + WindowedValue<IN> inElement, + OUTDF output, + Instant timestamp, + Collector<WindowedValue<OUTFL>> outCollector); + + public abstract <T> void sideOutputWithTimestampHelper( + WindowedValue<IN> inElement, + T output, + Instant timestamp, + Collector<WindowedValue<OUTFL>> outCollector, + TupleTag<T> tag); + + public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper( + WindowedValue<IN> inElement, + Collector<WindowedValue<OUTFL>> outCollector); } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index fb3d329..55235c9 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -68,562 +68,562 @@ import java.util.*; * for furhter processing. */ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> - extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>> - implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> { - - private static final long serialVersionUID = 1L; - - private transient PipelineOptions options; - - private transient CoderRegistry coderRegistry; - - private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; - - private ProcessContext context; - - private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy; - - private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn; - - private final KvCoder<K, VIN> inputKvCoder; - - /** - * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a - * key whose elements are currently waiting to be processed, and its associated state. - */ - private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(); - - /** - * Timers waiting to be processed. - */ - private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); - - private FlinkTimerInternals timerInternals = new FlinkTimerInternals(); - - /** - * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. - * This method assumes that <b>elements are already grouped by key</b>. - * <p/> - * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)} - * is that this method assumes that a combiner function is provided - * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). - * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state. - * - * @param options the general job configuration options. - * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. - * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. - * @param combiner the combiner to be used. - * @param outputKvCoder the type of the output values. - */ - public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create( - PipelineOptions options, - PCollection input, - KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey, - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner, - KvCoder<K, VOUT> outputKvCoder) { - Preconditions.checkNotNull(options); - - KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); - FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options, - input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner); - - Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( - outputKvCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo = - new CoderTypeInformation<>(windowedOutputElemCoder); - - DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey - .transform("GroupByWindowWithCombiner", - new CoderTypeInformation<>(outputKvCoder), - windower) - .returns(outputTypeInfo); - - return groupedByKeyAndWindow; - } - - /** - * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. - * This method assumes that <b>elements are already grouped by key</b>. - * <p/> - * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)} - * is that this method assumes no combiner function - * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). - * - * @param options the general job configuration options. - * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. - * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. - */ - public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable( - PipelineOptions options, - PCollection input, - KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) { - Preconditions.checkNotNull(options); - - KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); - - FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options, - input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null); - - Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder); - KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder); - - Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( - outputElemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo = - new CoderTypeInformation<>(windowedOutputElemCoder); - - DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey - .transform("GroupByWindow", - new CoderTypeInformation<>(windowedOutputElemCoder), - windower) - .returns(outputTypeInfo); - - return groupedByKeyAndWindow; - } - - public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper - createForTesting(PipelineOptions options, - CoderRegistry registry, - WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, - KvCoder<K, VIN> inputCoder, - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { - Preconditions.checkNotNull(options); - - return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner); - } - - private FlinkGroupAlsoByWindowWrapper(PipelineOptions options, - CoderRegistry registry, - WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, - KvCoder<K, VIN> inputCoder, - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { - Preconditions.checkNotNull(options); - - this.options = Preconditions.checkNotNull(options); - this.coderRegistry = Preconditions.checkNotNull(registry); - this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder(); - this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy(); - this.combineFn = combiner; - this.operator = createGroupAlsoByWindowOperator(); - this.chainingStrategy = ChainingStrategy.ALWAYS; - } - - @Override - public void open() throws Exception { - super.open(); - this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); - } - - /** - * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}, - * <b> if not already created</b>. - * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then - * a function with that combiner is created, so that elements are combined as they arrive. This is - * done for speed and (in most of the cases) for reduction of the per-window state. - */ - private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { - if (this.operator == null) { - if (this.combineFn == null) { - // Thus VOUT == Iterable<VIN> - Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); - - this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( - (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); - } else { - Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); - - AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn - .withInputCoder(combineFn, coderRegistry, inputKvCoder); - - this.operator = GroupAlsoByWindowViaWindowSetDoFn.create( - (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); - } - } - return this.operator; - } - - private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { - context.setElement(workItem, getStateInternalsForKey(workItem.key())); - - // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. - operator.startBundle(context); - operator.processElement(context); - operator.finishBundle(context); - } - - @Override - public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception { - ArrayList<WindowedValue<VIN>> elements = new ArrayList<>(); - elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), - element.getValue().getWindows(), element.getValue().getPane())); - processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - context.setCurrentInputWatermark(new Instant(mark.getTimestamp())); - - Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); - if (!timers.isEmpty()) { - for (K key : timers.keySet()) { - processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key))); - } - } - - /** - * This is to take into account the different semantics of the Watermark in Flink and - * in Dataflow. To understand the reasoning behind the Dataflow semantics and its - * watermark holding logic, see the documentation of - * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)} - * */ - long millis = Long.MAX_VALUE; - for (FlinkStateInternals state : perKeyStateInternals.values()) { - Instant watermarkHold = state.getWatermarkHold(); - if (watermarkHold != null && watermarkHold.getMillis() < millis) { - millis = watermarkHold.getMillis(); - } - } - - if (mark.getTimestamp() < millis) { - millis = mark.getTimestamp(); - } - - context.setCurrentOutputWatermark(new Instant(millis)); - - // Don't forget to re-emit the watermark for further operators down the line. - // This is critical for jobs with multiple aggregation steps. - // Imagine a job with a groupByKey() on key K1, followed by a map() that changes - // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark - // is not re-emitted, the second aggregation would never be triggered, and no result - // will be produced. - output.emitWatermark(new Watermark(millis)); - } - - @Override - public void close() throws Exception { - super.close(); - } - - private void registerActiveTimer(K key, TimerInternals.TimerData timer) { - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); - if (timersForKey == null) { - timersForKey = new HashSet<>(); - } - timersForKey.add(timer); - activeTimers.put(key, timersForKey); - } - - private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); - if (timersForKey != null) { - timersForKey.remove(timer); - if (timersForKey.isEmpty()) { - activeTimers.remove(key); - } else { - activeTimers.put(key, timersForKey); - } - } - } - - /** - * Returns the list of timers that are ready to fire. These are the timers - * that are registered to be triggered at a time before the current watermark. - * We keep these timers in a Set, so that they are deduplicated, as the same - * timer can be registered multiple times. - */ - private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { - - // we keep the timers to return in a different list and launch them later - // because we cannot prevent a trigger from registering another trigger, - // which would lead to concurrent modification exception. - Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); - - Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); - - Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); - while (timerIt.hasNext()) { - TimerInternals.TimerData timerData = timerIt.next(); - if (timerData.getTimestamp().isBefore(currentWatermark)) { - toFire.put(keyWithTimers.getKey(), timerData); - timerIt.remove(); - } - } - - if (keyWithTimers.getValue().isEmpty()) { - it.remove(); - } - } - return toFire; - } - - /** - * Gets the state associated with the specified key. - * - * @param key the key whose state we want. - * @return The {@link FlinkStateInternals} - * associated with that key. - */ - private FlinkStateInternals<K> getStateInternalsForKey(K key) { - FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key); - if (stateInternals == null) { - Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); - OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn(); - stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn); - perKeyStateInternals.put(key, stateInternals); - } - return stateInternals; - } - - private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> { - @Override - public void setTimer(TimerData timerKey) { - registerActiveTimer(context.element().key(), timerKey); - } - - @Override - public void deleteTimer(TimerData timerKey) { - unregisterActiveTimer(context.element().key(), timerKey); - } - } - - private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext { - - private final FlinkTimerInternals timerInternals; - - private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector; - - private FlinkStateInternals<K> stateInternals; - - private KeyedWorkItem<K, VIN> element; - - public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, - TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector, - FlinkTimerInternals timerInternals) { - function.super(); - super.setupDelegateAggregators(); - - this.collector = Preconditions.checkNotNull(outCollector); - this.timerInternals = Preconditions.checkNotNull(timerInternals); - } - - public void setElement(KeyedWorkItem<K, VIN> element, - FlinkStateInternals<K> stateForKey) { - this.element = element; - this.stateInternals = stateForKey; - } - - public void setCurrentInputWatermark(Instant watermark) { - this.timerInternals.setCurrentInputWatermark(watermark); - } - - public void setCurrentOutputWatermark(Instant watermark) { - this.timerInternals.setCurrentOutputWatermark(watermark); - } - - @Override - public KeyedWorkItem<K, VIN> element() { - return this.element; - } - - @Override - public Instant timestamp() { - throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); - } - - @Override - public PipelineOptions getPipelineOptions() { - // TODO: PipelineOptions need to be available on the workers. - // Ideally they are captured as part of the pipeline. - // For now, construct empty options so that StateContexts.createFromComponents - // will yield a valid StateContext, which is needed to support the StateContext.window(). - if (options == null) { - options = new PipelineOptions() { - @Override - public <T extends PipelineOptions> T as(Class<T> kls) { - return null; - } - - @Override - public <T extends PipelineOptions> T cloneAs(Class<T> kls) { - return null; - } - - @Override - public Class<? extends PipelineRunner<?>> getRunner() { - return null; - } - - @Override - public void setRunner(Class<? extends PipelineRunner<?>> kls) { - - } - - @Override - public CheckEnabled getStableUniqueNames() { - return null; - } - - @Override - public void setStableUniqueNames(CheckEnabled enabled) { - } - }; - } - return options; - } - - @Override - public void output(KV<K, VOUT> output) { - throw new UnsupportedOperationException( - "output() is not available when processing KeyedWorkItems."); - } - - @Override - public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) { - throw new UnsupportedOperationException( - "outputWithTimestamp() is not available when processing KeyedWorkItems."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "window() is not available when processing KeyedWorkItems."); - } - - @Override - public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() { - return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { - - @Override - public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() { - return stateInternals; - } - - @Override - public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - // TODO: No need to represent timestamp twice. - collector.setAbsoluteTimestamp(timestamp.getMillis()); - collector.collect(WindowedValue.of(output, timestamp, windows, pane)); - - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException("windows() is not available in Streaming mode."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException("pane() is not available in Streaming mode."); - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() is not available in Streaming mode."); - } - }; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new RuntimeException("sideInput() is not supported in Streaming mode."); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - throw new RuntimeException("sideOutput() is not available when grouping by window."); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - Accumulator acc = getRuntimeContext().getAccumulator(name); - if (acc != null) { - AccumulatorHelper.compareAccumulatorTypes(name, - SerializableFnAggregatorWrapper.class, acc.getClass()); - return (Aggregator<AggInputT, AggOutputT>) acc; - } - - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = - new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, accumulator); - return accumulator; - } - } - - ////////////// Checkpointing implementation //////////////// - - @Override - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { - StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - StateCheckpointWriter writer = StateCheckpointWriter.create(out); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - - // checkpoint the timers - StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder); - - // checkpoint the state - StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder); - - // checkpoint the timerInternals - context.timerInternals.encodeTimerInternals(context, writer, - inputKvCoder, windowingStrategy.getWindowFn().windowCoder()); - - taskState.setOperatorState(out.closeAndGetHandle()); - return taskState; - } - - @Override - public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { - super.restoreState(taskState, recoveryTimestamp); - - final ClassLoader userClassloader = getUserCodeClassloader(); - - Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - - @SuppressWarnings("unchecked") - StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); - DataInputView in = inputState.getState(userClassloader); - StateCheckpointReader reader = new StateCheckpointReader(in); - - // restore the timers - this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); - - // restore the state - this.perKeyStateInternals = StateCheckpointUtils.decodeState( - reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader); - - // restore the timerInternals. - this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); - } + extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>> + implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> { + + private static final long serialVersionUID = 1L; + + private transient PipelineOptions options; + + private transient CoderRegistry coderRegistry; + + private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; + + private ProcessContext context; + + private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy; + + private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn; + + private final KvCoder<K, VIN> inputKvCoder; + + /** + * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a + * key whose elements are currently waiting to be processed, and its associated state. + */ + private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(); + + /** + * Timers waiting to be processed. + */ + private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private FlinkTimerInternals timerInternals = new FlinkTimerInternals(); + + /** + * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. + * This method assumes that <b>elements are already grouped by key</b>. + * <p/> + * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)} + * is that this method assumes that a combiner function is provided + * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state. + * + * @param options the general job configuration options. + * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. + * @param combiner the combiner to be used. + * @param outputKvCoder the type of the output values. + */ + public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create( + PipelineOptions options, + PCollection input, + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner, + KvCoder<K, VOUT> outputKvCoder) { + Preconditions.checkNotNull(options); + + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); + FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options, + input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner); + + Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( + outputKvCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo = + new CoderTypeInformation<>(windowedOutputElemCoder); + + DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey + .transform("GroupByWindowWithCombiner", + new CoderTypeInformation<>(outputKvCoder), + windower) + .returns(outputTypeInfo); + + return groupedByKeyAndWindow; + } + + /** + * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. + * This method assumes that <b>elements are already grouped by key</b>. + * <p/> + * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)} + * is that this method assumes no combiner function + * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * + * @param options the general job configuration options. + * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. + */ + public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable( + PipelineOptions options, + PCollection input, + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) { + Preconditions.checkNotNull(options); + + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); + + FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options, + input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null); + + Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder); + KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder); + + Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( + outputElemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo = + new CoderTypeInformation<>(windowedOutputElemCoder); + + DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey + .transform("GroupByWindow", + new CoderTypeInformation<>(windowedOutputElemCoder), + windower) + .returns(outputTypeInfo); + + return groupedByKeyAndWindow; + } + + public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper + createForTesting(PipelineOptions options, + CoderRegistry registry, + WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, + KvCoder<K, VIN> inputCoder, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { + Preconditions.checkNotNull(options); + + return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner); + } + + private FlinkGroupAlsoByWindowWrapper(PipelineOptions options, + CoderRegistry registry, + WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, + KvCoder<K, VIN> inputCoder, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { + Preconditions.checkNotNull(options); + + this.options = Preconditions.checkNotNull(options); + this.coderRegistry = Preconditions.checkNotNull(registry); + this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder(); + this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy(); + this.combineFn = combiner; + this.operator = createGroupAlsoByWindowOperator(); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); + } + + /** + * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}, + * <b> if not already created</b>. + * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then + * a function with that combiner is created, so that elements are combined as they arrive. This is + * done for speed and (in most of the cases) for reduction of the per-window state. + */ + private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { + if (this.operator == null) { + if (this.combineFn == null) { + // Thus VOUT == Iterable<VIN> + Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); + + this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( + (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); + } else { + Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); + + AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn + .withInputCoder(combineFn, coderRegistry, inputKvCoder); + + this.operator = GroupAlsoByWindowViaWindowSetDoFn.create( + (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); + } + } + return this.operator; + } + + private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { + context.setElement(workItem, getStateInternalsForKey(workItem.key())); + + // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. + operator.startBundle(context); + operator.processElement(context); + operator.finishBundle(context); + } + + @Override + public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception { + ArrayList<WindowedValue<VIN>> elements = new ArrayList<>(); + elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), + element.getValue().getWindows(), element.getValue().getPane())); + processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + context.setCurrentInputWatermark(new Instant(mark.getTimestamp())); + + Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); + if (!timers.isEmpty()) { + for (K key : timers.keySet()) { + processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key))); + } + } + + /** + * This is to take into account the different semantics of the Watermark in Flink and + * in Dataflow. To understand the reasoning behind the Dataflow semantics and its + * watermark holding logic, see the documentation of + * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)} + * */ + long millis = Long.MAX_VALUE; + for (FlinkStateInternals state : perKeyStateInternals.values()) { + Instant watermarkHold = state.getWatermarkHold(); + if (watermarkHold != null && watermarkHold.getMillis() < millis) { + millis = watermarkHold.getMillis(); + } + } + + if (mark.getTimestamp() < millis) { + millis = mark.getTimestamp(); + } + + context.setCurrentOutputWatermark(new Instant(millis)); + + // Don't forget to re-emit the watermark for further operators down the line. + // This is critical for jobs with multiple aggregation steps. + // Imagine a job with a groupByKey() on key K1, followed by a map() that changes + // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark + // is not re-emitted, the second aggregation would never be triggered, and no result + // will be produced. + output.emitWatermark(new Watermark(millis)); + } + + @Override + public void close() throws Exception { + super.close(); + } + + private void registerActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey == null) { + timersForKey = new HashSet<>(); + } + timersForKey.add(timer); + activeTimers.put(key, timersForKey); + } + + private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey != null) { + timersForKey.remove(timer); + if (timersForKey.isEmpty()) { + activeTimers.remove(key); + } else { + activeTimers.put(key, timersForKey); + } + } + } + + /** + * Returns the list of timers that are ready to fire. These are the timers + * that are registered to be triggered at a time before the current watermark. + * We keep these timers in a Set, so that they are deduplicated, as the same + * timer can be registered multiple times. + */ + private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { + + // we keep the timers to return in a different list and launch them later + // because we cannot prevent a trigger from registering another trigger, + // which would lead to concurrent modification exception. + Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); + + Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + + Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); + while (timerIt.hasNext()) { + TimerInternals.TimerData timerData = timerIt.next(); + if (timerData.getTimestamp().isBefore(currentWatermark)) { + toFire.put(keyWithTimers.getKey(), timerData); + timerIt.remove(); + } + } + + if (keyWithTimers.getValue().isEmpty()) { + it.remove(); + } + } + return toFire; + } + + /** + * Gets the state associated with the specified key. + * + * @param key the key whose state we want. + * @return The {@link FlinkStateInternals} + * associated with that key. + */ + private FlinkStateInternals<K> getStateInternalsForKey(K key) { + FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key); + if (stateInternals == null) { + Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn(); + stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn); + perKeyStateInternals.put(key, stateInternals); + } + return stateInternals; + } + + private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> { + @Override + public void setTimer(TimerData timerKey) { + registerActiveTimer(context.element().key(), timerKey); + } + + @Override + public void deleteTimer(TimerData timerKey) { + unregisterActiveTimer(context.element().key(), timerKey); + } + } + + private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext { + + private final FlinkTimerInternals timerInternals; + + private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector; + + private FlinkStateInternals<K> stateInternals; + + private KeyedWorkItem<K, VIN> element; + + public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, + TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector, + FlinkTimerInternals timerInternals) { + function.super(); + super.setupDelegateAggregators(); + + this.collector = Preconditions.checkNotNull(outCollector); + this.timerInternals = Preconditions.checkNotNull(timerInternals); + } + + public void setElement(KeyedWorkItem<K, VIN> element, + FlinkStateInternals<K> stateForKey) { + this.element = element; + this.stateInternals = stateForKey; + } + + public void setCurrentInputWatermark(Instant watermark) { + this.timerInternals.setCurrentInputWatermark(watermark); + } + + public void setCurrentOutputWatermark(Instant watermark) { + this.timerInternals.setCurrentOutputWatermark(watermark); + } + + @Override + public KeyedWorkItem<K, VIN> element() { + return this.element; + } + + @Override + public Instant timestamp() { + throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PipelineOptions getPipelineOptions() { + // TODO: PipelineOptions need to be available on the workers. + // Ideally they are captured as part of the pipeline. + // For now, construct empty options so that StateContexts.createFromComponents + // will yield a valid StateContext, which is needed to support the StateContext.window(). + if (options == null) { + options = new PipelineOptions() { + @Override + public <T extends PipelineOptions> T as(Class<T> kls) { + return null; + } + + @Override + public <T extends PipelineOptions> T cloneAs(Class<T> kls) { + return null; + } + + @Override + public Class<? extends PipelineRunner<?>> getRunner() { + return null; + } + + @Override + public void setRunner(Class<? extends PipelineRunner<?>> kls) { + + } + + @Override + public CheckEnabled getStableUniqueNames() { + return null; + } + + @Override + public void setStableUniqueNames(CheckEnabled enabled) { + } + }; + } + return options; + } + + @Override + public void output(KV<K, VOUT> output) { + throw new UnsupportedOperationException( + "output() is not available when processing KeyedWorkItems."); + } + + @Override + public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) { + throw new UnsupportedOperationException( + "outputWithTimestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "window() is not available when processing KeyedWorkItems."); + } + + @Override + public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() { + return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { + + @Override + public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() { + return stateInternals; + } + + @Override + public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + // TODO: No need to represent timestamp twice. + collector.setAbsoluteTimestamp(timestamp.getMillis()); + collector.collect(WindowedValue.of(output, timestamp, windows, pane)); + + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException("windows() is not available in Streaming mode."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available in Streaming mode."); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() is not available in Streaming mode."); + } + }; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() is not available when grouping by window."); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + Accumulator acc = getRuntimeContext().getAccumulator(name); + if (acc != null) { + AccumulatorHelper.compareAccumulatorTypes(name, + SerializableFnAggregatorWrapper.class, acc.getClass()); + return (Aggregator<AggInputT, AggOutputT>) acc; + } + + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = + new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, accumulator); + return accumulator; + } + } + + ////////////// Checkpointing implementation //////////////// + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + StateCheckpointWriter writer = StateCheckpointWriter.create(out); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder); + + // checkpoint the timerInternals + context.timerInternals.encodeTimerInternals(context, writer, + inputKvCoder, windowingStrategy.getWindowFn().windowCoder()); + + taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { + super.restoreState(taskState, recoveryTimestamp); + + final ClassLoader userClassloader = getUserCodeClassloader(); + + Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + @SuppressWarnings("unchecked") + StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); + DataInputView in = inputState.getState(userClassloader); + StateCheckpointReader reader = new StateCheckpointReader(in); + + // restore the timers + this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + + // restore the state + this.perKeyStateInternals = StateCheckpointUtils.decodeState( + reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader); + + // restore the timerInternals. + this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java index 24f6d40..d01cf81 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -34,31 +34,31 @@ import org.apache.flink.streaming.api.datastream.KeyedStream; * */ public class FlinkGroupByKeyWrapper { - /** - * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement - * multiple interfaces. - */ - private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> { - } + /** + * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement + * multiple interfaces. + */ + private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> { + } - public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) { - final Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder); - final boolean isKeyVoid = keyCoder instanceof VoidCoder; + public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) { + final Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder); + final boolean isKeyVoid = keyCoder instanceof VoidCoder; - return inputDataStream.keyBy( - new KeySelectorWithQueryableResultType<K, V>() { + return inputDataStream.keyBy( + new KeySelectorWithQueryableResultType<K, V>() { - @Override - public K getKey(WindowedValue<KV<K, V>> value) throws Exception { - return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE : - value.getValue().getKey(); - } + @Override + public K getKey(WindowedValue<KV<K, V>> value) throws Exception { + return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE : + value.getValue().getKey(); + } - @Override - public TypeInformation<K> getProducedType() { - return keyTypeInfo; - } - }); - } + @Override + public TypeInformation<K> getProducedType() { + return keyTypeInfo; + } + }); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java index d65cbc3..066a55c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java @@ -33,43 +33,43 @@ import java.util.Map; * */ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> { - private final TupleTag<?> mainTag; - private final Map<TupleTag<?>, Integer> outputLabels; + private final TupleTag<?> mainTag; + private final Map<TupleTag<?>, Integer> outputLabels; - public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { - super(options, windowingStrategy, doFn); - this.mainTag = Preconditions.checkNotNull(mainTag); - this.outputLabels = Preconditions.checkNotNull(tagsToLabels); - } + public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { + super(options, windowingStrategy, doFn); + this.mainTag = Preconditions.checkNotNull(mainTag); + this.outputLabels = Preconditions.checkNotNull(tagsToLabels); + } - @Override - public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) { - checkTimestamp(inElement, timestamp); - Integer index = outputLabels.get(mainTag); - collector.collect(makeWindowedValue( - new RawUnionValue(index, output), - timestamp, - inElement.getWindows(), - inElement.getPane())); - } + @Override + public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) { + checkTimestamp(inElement, timestamp); + Integer index = outputLabels.get(mainTag); + collector.collect(makeWindowedValue( + new RawUnionValue(index, output), + timestamp, + inElement.getWindows(), + inElement.getPane())); + } - @Override - public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) { - checkTimestamp(inElement, timestamp); - Integer index = outputLabels.get(tag); - if (index != null) { - collector.collect(makeWindowedValue( - new RawUnionValue(index, output), - timestamp, - inElement.getWindows(), - inElement.getPane())); - } - } + @Override + public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) { + checkTimestamp(inElement, timestamp); + Integer index = outputLabels.get(tag); + if (index != null) { + collector.collect(makeWindowedValue( + new RawUnionValue(index, output), + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + } - @Override - public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) { - throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " + - "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " + - "is not available in this class."); - } + @Override + public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) { + throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " + + "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " + + "is not available in this class."); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java index b0d8a76..b3a7090 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java @@ -35,64 +35,64 @@ import java.util.*; * */ public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { - public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { - super(options, windowingStrategy, doFn); - } + public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { + super(options, windowingStrategy, doFn); + } - @Override - public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) { - checkTimestamp(inElement, timestamp); - collector.collect(makeWindowedValue( - output, - timestamp, - inElement.getWindows(), - inElement.getPane())); - } + @Override + public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) { + checkTimestamp(inElement, timestamp); + collector.collect(makeWindowedValue( + output, + timestamp, + inElement.getWindows(), + inElement.getPane())); + } - @Override - public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - throw new RuntimeException("sideOutput() not not available in ParDo.Bound()."); - } + @Override + public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() not not available in ParDo.Bound()."); + } - @Override - public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) { - return new WindowingInternals<IN, OUT>() { - @Override - public StateInternals stateInternals() { - throw new NullPointerException("StateInternals are not available for ParDo.Bound()."); - } + @Override + public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) { + return new WindowingInternals<IN, OUT>() { + @Override + public StateInternals stateInternals() { + throw new NullPointerException("StateInternals are not available for ParDo.Bound()."); + } - @Override - public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - collector.collect(makeWindowedValue(output, timestamp, windows, pane)); - } + @Override + public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + collector.collect(makeWindowedValue(output, timestamp, windows, pane)); + } - @Override - public TimerInternals timerInternals() { - throw new NullPointerException("TimeInternals are not available for ParDo.Bound()."); - } + @Override + public TimerInternals timerInternals() { + throw new NullPointerException("TimeInternals are not available for ParDo.Bound()."); + } - @Override - public Collection<? extends BoundedWindow> windows() { - return inElement.getWindows(); - } + @Override + public Collection<? extends BoundedWindow> windows() { + return inElement.getWindows(); + } - @Override - public PaneInfo pane() { - return inElement.getPane(); - } + @Override + public PaneInfo pane() { + return inElement.getPane(); + } - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode."); - } + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode."); + } - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() not implemented."); - } - }; - } + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() not implemented."); + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java index dc8e05a..39770c9 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java @@ -34,30 +34,30 @@ import java.util.List; */ public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> { - private final List<byte[]> elements; - private final Coder<OUT> coder; - - public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) { - this.elements = elements; - this.coder = coder; - } - - @Override - public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { - - @SuppressWarnings("unchecked") - OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; - for (byte[] element : elements) { - ByteArrayInputStream bai = new ByteArrayInputStream(element); - OUT outValue = coder.decode(bai, Coder.Context.OUTER); - - if (outValue == null) { - out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } else { - out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } - } - - out.close(); - } + private final List<byte[]> elements; + private final Coder<OUT> coder; + + public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) { + this.elements = elements; + this.coder = coder; + } + + @Override + public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { + + @SuppressWarnings("unchecked") + OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; + for (byte[] element : elements) { + ByteArrayInputStream bai = new ByteArrayInputStream(element); + OUT outValue = coder.decode(bai, Coder.Context.OUTER); + + if (outValue == null) { + out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } else { + out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } + } + + out.close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 699d256..4d6f4e2 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -31,50 +31,50 @@ import java.util.List; * */ public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> { - private final PipelineOptions options; - private final RichParallelSourceFunction<T> flinkSource; + private final PipelineOptions options; + private final RichParallelSourceFunction<T> flinkSource; - public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) { - if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - options = Preconditions.checkNotNull(pipelineOptions); - flinkSource = Preconditions.checkNotNull(source); - validate(); - } + public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) { + if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + options = Preconditions.checkNotNull(pipelineOptions); + flinkSource = Preconditions.checkNotNull(source); + validate(); + } - public RichParallelSourceFunction<T> getFlinkSource() { - return this.flinkSource; - } + public RichParallelSourceFunction<T> getFlinkSource() { + return this.flinkSource; + } - @Override - public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } + @Override + public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } - @Override - public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } + @Override + public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } - @Nullable - @Override - public Coder<C> getCheckpointMarkCoder() { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } + @Nullable + @Override + public Coder<C> getCheckpointMarkCoder() { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } - @Override - public void validate() { - Preconditions.checkNotNull(options); - Preconditions.checkNotNull(flinkSource); - if(!options.getRunner().equals(FlinkPipelineRunner.class)) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - } + @Override + public void validate() { + Preconditions.checkNotNull(options); + Preconditions.checkNotNull(flinkSource); + if(!options.getRunner().equals(FlinkPipelineRunner.class)) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + } - @Override - public Coder<T> getDefaultOutputCoder() { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } + @Override + public Coder<T> getDefaultOutputCoder() { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } }
