http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index 48ad387..d9fa9f0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -19,10 +19,14 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; @@ -69,7 +73,7 @@ import static java.util.Objects.requireNonNull; * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ public class NonKeyedWindowOperator<IN, OUT, W extends Window> - extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>> + extends AbstractUdfStreamOperator<OUT, AllWindowFunction<Iterable<IN>, OUT, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -145,7 +149,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, - AllWindowFunction<IN, OUT, W> windowFunction, + AllWindowFunction<Iterable<IN>, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger) { super(windowFunction); @@ -413,29 +417,72 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> } } - @SuppressWarnings("unchecked") - public <S extends Serializable> ValueState<S> getKeyValueState(final String name, final S defaultState) { - return new ValueState<S>() { + @Override + public <S extends Serializable> ValueState<S> getKeyValueState(String name, + Class<S> stateType, + S defaultState) { + requireNonNull(stateType, "The state type class must not be null"); + + TypeInformation<S> typeInfo; + try { + typeInfo = TypeExtractor.getForClass(stateType); + } + catch (Exception e) { + throw new RuntimeException("Cannot analyze type '" + stateType.getName() + + "' from the class alone, due to generic type parameters. " + + "Please specify the TypeInformation directly.", e); + } + + return getKeyValueState(name, typeInfo, defaultState); + } + + @Override + public <S extends Serializable> ValueState<S> getKeyValueState(String name, + TypeInformation<S> stateType, + S defaultState) { + + requireNonNull(name, "The name of the state must not be null"); + requireNonNull(stateType, "The state type information must not be null"); + + ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); + return getPartitionedState(stateDesc); + } + + @Override + @SuppressWarnings("rawtypes, unchecked") + public <S extends State> S getPartitionedState(final StateDescriptor<S> stateDescriptor) { + if (!(stateDescriptor instanceof ValueStateDescriptor)) { + throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only " + + "support ValueState."); + } + @SuppressWarnings("unchecked") + final ValueStateDescriptor<?> valueStateDescriptor = (ValueStateDescriptor<?>) stateDescriptor; + ValueState valueState = new ValueState() { @Override - public S value() throws IOException { - Serializable value = state.get(name); + public Object value() throws IOException { + Object value = state.get(stateDescriptor.getName()); if (value == null) { - state.put(name, defaultState); - value = defaultState; + value = valueStateDescriptor.getDefaultValue(); + state.put(stateDescriptor.getName(), (Serializable) value); } - return (S) value; + return value; } @Override - public void update(S value) throws IOException { - state.put(name, value); + public void update(Object value) throws IOException { + if (!(value instanceof Serializable)) { + throw new UnsupportedOperationException( + "Value state of NonKeyedWindowOperator must be serializable."); + } + state.put(stateDescriptor.getName(), (Serializable) value); } @Override public void clear() { - state.remove(name); + state.remove(stateDescriptor.getName()); } }; + return (S) valueState; } @Override
http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index fd39481..5109dae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -19,11 +19,16 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; @@ -37,25 +42,18 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import static java.util.Objects.requireNonNull; @@ -86,8 +84,8 @@ import static java.util.Objects.requireNonNull; * @param <OUT> The type of elements emitted by the {@code WindowFunction}. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ -public class WindowOperator<K, IN, OUT, W extends Window> - extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>> +public class WindowOperator<K, IN, ACC, OUT, W extends Window> + extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -98,52 +96,42 @@ public class WindowOperator<K, IN, OUT, W extends Window> // Configuration values and user functions // ------------------------------------------------------------------------ - private final WindowAssigner<? super IN, W> windowAssigner; + protected final WindowAssigner<? super IN, W> windowAssigner; - private final KeySelector<IN, K> keySelector; + protected final KeySelector<IN, K> keySelector; - private final Trigger<? super IN, ? super W> trigger; + protected final Trigger<? super IN, ? super W> trigger; - private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory; + protected final StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor; /** * If this is true. The current processing time is set as the timestamp of incoming elements. * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} * if eviction should happen based on processing time. */ - private boolean setProcessingTime = false; + protected boolean setProcessingTime = false; /** * This is used to copy the incoming element because it can be put into several window * buffers. */ - private TypeSerializer<IN> inputSerializer; + protected TypeSerializer<IN> inputSerializer; /** * For serializing the key in checkpoints. */ - private final TypeSerializer<K> keySerializer; + protected final TypeSerializer<K> keySerializer; /** * For serializing the window in checkpoints. */ - private final TypeSerializer<W> windowSerializer; + protected final TypeSerializer<W> windowSerializer; // ------------------------------------------------------------------------ // State that is not checkpointed // ------------------------------------------------------------------------ /** - * Processing time timers that are currently in-flight. - */ - private transient Map<Long, Set<Context>> processingTimeTimers; - - /** - * Current waiting watermark callbacks. - */ - private transient Map<Long, Set<Context>> watermarkTimers; - - /** * This is given to the {@code WindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector<OUT> timestampedCollector; @@ -154,15 +142,23 @@ public class WindowOperator<K, IN, OUT, W extends Window> */ protected transient long currentWatermark = -1L; + protected transient Context context = new Context(null, null); + // ------------------------------------------------------------------------ // State that needs to be checkpointed // ------------------------------------------------------------------------ /** - * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer} - * and a {@code TriggerContext} that stores the {@code Trigger} for that pane. + * Processing time timers that are currently in-flight. + */ + protected transient Set<Timer<K, W>> processingTimeTimers; + protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; + + /** + * Current waiting watermark callbacks. */ - protected transient Map<K, Map<W, Context>> windows; + protected transient Set<Timer<K, W>> watermarkTimers; + protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; /** * Creates a new {@code WindowOperator} based on the given policies and user functions. @@ -171,8 +167,8 @@ public class WindowOperator<K, IN, OUT, W extends Window> TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, - WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, - WindowFunction<IN, OUT, K, W> windowFunction, + StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor, + WindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger) { super(windowFunction); @@ -182,7 +178,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> this.keySelector = requireNonNull(keySelector); this.keySerializer = requireNonNull(keySerializer); - this.windowBufferFactory = requireNonNull(windowBufferFactory); + this.windowStateDescriptor = windowStateDescriptor; this.trigger = requireNonNull(trigger); setChainingStrategy(ChainingStrategy.ALWAYS); @@ -209,159 +205,100 @@ public class WindowOperator<K, IN, OUT, W extends Window> throw new IllegalStateException("Input serializer was not set."); } - windowBufferFactory.setRuntimeContext(getRuntimeContext()); - windowBufferFactory.open(getUserFunctionParameters()); - - // these could already be initialized from restoreState() if (watermarkTimers == null) { - watermarkTimers = new HashMap<>(); + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); } if (processingTimeTimers == null) { - processingTimeTimers = new HashMap<>(); + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); } - if (windows == null) { - windows = new HashMap<>(); - } - - // re-register timers that this window context had set - for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) { - Map<W, Context> keyWindows = entry.getValue(); - for (Context context: keyWindows.values()) { - if (context.processingTimeTimer > 0) { - Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer); - if (triggers == null) { - getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this); - triggers = new HashSet<>(); - processingTimeTimers.put(context.processingTimeTimer, triggers); - } - triggers.add(context); - } - if (context.watermarkTimer > 0) { - Set<Context> triggers = watermarkTimers.get(context.watermarkTimer); - if (triggers == null) { - triggers = new HashSet<>(); - watermarkTimers.put(context.watermarkTimer, triggers); - } - triggers.add(context); - } - } - } + context = new Context(null, null); } @Override - public final void dispose() { - super.dispose(); - windows.clear(); - try { - windowBufferFactory.close(); - } catch (Exception e) { - throw new RuntimeException("Error while closing WindowBufferFactory.", e); - } + public final void close() throws Exception { + super.close(); } @Override @SuppressWarnings("unchecked") - public final void processElement(StreamRecord<IN> element) throws Exception { + public void processElement(StreamRecord<IN> element) throws Exception { if (setProcessingTime) { element.replace(element.getValue(), System.currentTimeMillis()); } Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp()); - K key = keySelector.getKey(element.getValue()); - - Map<W, Context> keyWindows = windows.get(key); - if (keyWindows == null) { - keyWindows = new HashMap<>(); - windows.put(key, keyWindows); - } + K key = (K) getStateBackend().getCurrentKey(); for (W window: elementWindows) { - Context context = keyWindows.get(window); - if (context == null) { - WindowBuffer<IN> windowBuffer = windowBufferFactory.create(); - context = new Context(key, window, windowBuffer); - keyWindows.put(window, context); - } - context.windowBuffer.storeElement(element); - Trigger.TriggerResult triggerResult = context.onElement(element); - processTriggerResult(triggerResult, key, window); - } - } - - protected void emitWindow(Context context) throws Exception { - timestampedCollector.setTimestamp(context.window.maxTimestamp()); + MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, + windowStateDescriptor); + windowState.add(element.getValue()); - if (context.windowBuffer.size() > 0) { - setKeyContextElement1(context.windowBuffer.getElements().iterator().next()); + context.key = key; + context.window = window; + Trigger.TriggerResult triggerResult = context.onElement(element); - userFunction.apply(context.key, - context.window, - context.windowBuffer.getUnpackedElements(), - timestampedCollector); + processTriggerResult(triggerResult, key, window); } } - private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { + protected void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { if (!triggerResult.isFire() && !triggerResult.isPurge()) { // do nothing return; } - Context context; - Map<W, Context> keyWindows = windows.get(key); - if (keyWindows == null) { - LOG.debug("Window {} for key {} already gone.", window, key); - return; - } - - if (triggerResult.isPurge()) { - context = keyWindows.remove(window); - if (keyWindows.isEmpty()) { - windows.remove(key); - } - } else { - context = keyWindows.get(window); - } - if (context == null) { - LOG.debug("Window {} for key {} already gone.", window, key); - return; - } if (triggerResult.isFire()) { - emitWindow(context); + timestampedCollector.setTimestamp(window.maxTimestamp()); + + setKeyContext(key); + + MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, + windowStateDescriptor); + + ACC contents = windowState.get(); + + userFunction.apply(context.key, context.window, contents, timestampedCollector); + + if (triggerResult.isPurge()) { + windowState.clear(); + } + } else if (triggerResult.isPurge()) { + setKeyContext(key); + MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, + windowStateDescriptor); + windowState.clear(); } } @Override public final void processWatermark(Watermark mark) throws Exception { - List<Set<Context>> toTrigger = new ArrayList<>(); - Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator(); + boolean fire; - while (it.hasNext()) { - Map.Entry<Long, Set<Context>> triggers = it.next(); - if (triggers.getKey() <= mark.getTimestamp()) { - toTrigger.add(triggers.getValue()); - it.remove(); - } - } + do { + Timer<K, W> timer = watermarkTimersQueue.peek(); + if (timer != null && timer.timestamp <= mark.getTimestamp()) { + fire = true; + + watermarkTimers.remove(timer); + watermarkTimersQueue.remove(); - for (Set<Context> ctxs: toTrigger) { - for (Context ctx: ctxs) { - // double check the time. it can happen that the trigger registers a new timer, - // in that case the entry is left in the watermarkTimers set for performance reasons. - // We have to check here whether the entry in the set still reflects the - // currently set timer in the Context. - if (ctx.watermarkTimer <= mark.getTimestamp()) { - Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer); - processTriggerResult(triggerResult, ctx.key, ctx.window); - } + context.key = timer.key; + context.window = timer.window; + Trigger.TriggerResult triggerResult = context.onEventTime(mark.getTimestamp()); + processTriggerResult(triggerResult, context.key, context.window); + } else { + fire = false; } - } + } while (fire); output.emitWatermark(mark); @@ -370,206 +307,173 @@ public class WindowOperator<K, IN, OUT, W extends Window> @Override public final void trigger(long time) throws Exception { - List<Set<Context>> toTrigger = new ArrayList<>(); + boolean fire; - Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator(); + do { + Timer<K, W> timer = processingTimeTimersQueue.peek(); + if (timer != null && timer.timestamp <= time) { + fire = true; - while (it.hasNext()) { - Map.Entry<Long, Set<Context>> triggers = it.next(); - if (triggers.getKey() <= time) { - toTrigger.add(triggers.getValue()); - it.remove(); - } - } + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(); - for (Set<Context> ctxs: toTrigger) { - for (Context ctx: ctxs) { - // double check the time. it can happen that the trigger registers a new timer, - // in that case the entry is left in the processingTimeTimers set for - // performance reasons. We have to check here whether the entry in the set still - // reflects the currently set timer in the Context. - if (ctx.processingTimeTimer <= time) { - Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer); - processTriggerResult(triggerResult, ctx.key, ctx.window); - } + context.key = timer.key; + context.window = timer.window; + Trigger.TriggerResult triggerResult = context.onProcessingTime(time); + processTriggerResult(triggerResult, context.key, context.window); + } else { + fire = false; } - } + } while (fire); + + // Also check any watermark timers. We might have some in here since + // Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered + // that is already behind the watermark. + processWatermark(new Watermark(currentWatermark)); } /** - * The {@code Context} is responsible for keeping track of the state of one pane. - * - * <p> - * A pane is the bucket of elements that have the same key (assigned by the - * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can - * be in multiple panes of it was assigned to multiple windows by the - * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all - * have their own instance of the {@code Trigger}. + * {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused + * by setting the {@code key} and {@code window} fields. No internal state must be kept in + * the {@code Context} */ protected class Context implements Trigger.TriggerContext { protected K key; protected W window; - protected WindowBuffer<IN> windowBuffer; - - protected HashMap<String, Serializable> state; - - // use these to only allow one timer in flight at a time of each type - // if the trigger registers another timer this value here will be overwritten, - // the timer is not removed from the set of in-flight timers to improve performance. - // When a trigger fires it is just checked against the last timer that was set. - protected long watermarkTimer; - protected long processingTimeTimer; - - public Context(K key, - W window, - WindowBuffer<IN> windowBuffer) { + public Context(K key, W window) { this.key = key; this.window = window; - this.windowBuffer = windowBuffer; - state = new HashMap<>(); - - this.watermarkTimer = -1; - this.processingTimeTimer = -1; } - /** - * Constructs a new {@code Context} by reading from a {@link DataInputView} that - * contains a serialized context that we wrote in - * {@link #writeToState(AbstractStateBackend.CheckpointStateOutputView)} - */ - @SuppressWarnings("unchecked") - protected Context(DataInputView in, ClassLoader userClassloader) throws Exception { - this.key = keySerializer.deserialize(in); - this.window = windowSerializer.deserialize(in); - this.watermarkTimer = in.readLong(); - this.processingTimeTimer = in.readLong(); - - int stateSize = in.readInt(); - byte[] stateData = new byte[stateSize]; - in.read(stateData); - state = InstantiationUtil.deserializeObject(stateData, userClassloader); - - this.windowBuffer = windowBufferFactory.create(); - int numElements = in.readInt(); - MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); - for (int i = 0; i < numElements; i++) { - windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord()); + @Override + public <S extends Serializable> ValueState<S> getKeyValueState(String name, + Class<S> stateType, + S defaultState) { + requireNonNull(stateType, "The state type class must not be null"); + + TypeInformation<S> typeInfo; + try { + typeInfo = TypeExtractor.getForClass(stateType); } + catch (Exception e) { + throw new RuntimeException("Cannot analyze type '" + stateType.getName() + + "' from the class alone, due to generic type parameters. " + + "Please specify the TypeInformation directly.", e); + } + + return getKeyValueState(name, typeInfo, defaultState); } - /** - * Writes the {@code Context} to the given state checkpoint output. - */ - protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException { - keySerializer.serialize(key, out); - windowSerializer.serialize(window, out); - out.writeLong(watermarkTimer); - out.writeLong(processingTimeTimer); - - byte[] serializedState = InstantiationUtil.serializeObject(state); - out.writeInt(serializedState.length); - out.write(serializedState, 0, serializedState.length); - - MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); - out.writeInt(windowBuffer.size()); - for (StreamRecord<IN> element: windowBuffer.getElements()) { - recordSerializer.serialize(element, out); - } + @Override + public <S extends Serializable> ValueState<S> getKeyValueState(String name, + TypeInformation<S> stateType, + S defaultState) { + + requireNonNull(name, "The name of the state must not be null"); + requireNonNull(stateType, "The state type information must not be null"); + + ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); + return getPartitionedState(stateDesc); } @SuppressWarnings("unchecked") - public <S extends Serializable> ValueState<S> getKeyValueState(final String name, final S defaultState) { - return new ValueState<S>() { - @Override - public S value() throws IOException { - Serializable value = state.get(name); - if (value == null) { - state.put(name, defaultState); - value = defaultState; - } - return (S) value; - } - - @Override - public void update(S value) throws IOException { - state.put(name, value); - } - - @Override - public void clear() { - state.remove(name); - } - }; + public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, + stateDescriptor); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } } @Override public void registerProcessingTimeTimer(long time) { - if (this.processingTimeTimer == time) { - // we already have set a trigger for that time - return; - } - Set<Context> triggers = processingTimeTimers.get(time); - if (triggers == null) { + Timer<K, W> timer = new Timer<>(time, key, window); + if (processingTimeTimers.add(timer)) { + processingTimeTimersQueue.add(timer); getRuntimeContext().registerTimer(time, WindowOperator.this); - triggers = new HashSet<>(); - processingTimeTimers.put(time, triggers); } - this.processingTimeTimer = time; - triggers.add(this); } @Override public void registerEventTimeTimer(long time) { - if (watermarkTimer == time) { - // we already have set a trigger for that time - return; + Timer<K, W> timer = new Timer<>(time, key, window); + if (watermarkTimers.add(timer)) { + watermarkTimersQueue.add(timer); } - Set<Context> triggers = watermarkTimers.get(time); - if (triggers == null) { - triggers = new HashSet<>(); - watermarkTimers.put(time, triggers); + + if (time <= currentWatermark) { + // immediately schedule a trigger, so that we don't wait for the next + // watermark update to fire the watermark trigger + getRuntimeContext().registerTimer(time, WindowOperator.this); } - this.watermarkTimer = time; - triggers.add(this); } public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { - Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this); - if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) { - // fire now and don't wait for the next watermark update - Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer); - return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult); - } else { - return onElementResult; - } + return trigger.onElement(element.getValue(), element.getTimestamp(), window, this); } public Trigger.TriggerResult onProcessingTime(long time) throws Exception { - if (time == processingTimeTimer) { - processingTimeTimer = -1; - return trigger.onProcessingTime(time, window, this); - } else { - return Trigger.TriggerResult.CONTINUE; - } + return trigger.onProcessingTime(time, window, this); } public Trigger.TriggerResult onEventTime(long time) throws Exception { - if (time == watermarkTimer) { - watermarkTimer = -1; - Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this); - - if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) { - // fire now and don't wait for the next watermark update - Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer); - return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult); - } else { - return firstTriggerResult; - } + return trigger.onEventTime(time, window, this); + } + } - } else { - return Trigger.TriggerResult.CONTINUE; + /** + * Internal class for keeping track of in-flight timers. + */ + protected static class Timer<K, W extends Window> implements Comparable<Timer<K, W>> { + protected long timestamp; + protected K key; + protected W window; + + public Timer(long timestamp, K key, W window) { + this.timestamp = timestamp; + this.key = key; + this.window = window; + } + + @Override + public int compareTo(Timer<K, W> o) { + return Long.compare(this.timestamp, o.timestamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; } + if (o == null || getClass() != o.getClass()){ + return false; + } + + Timer<?, ?> timer = (Timer<?, ?>) o; + + return timestamp == timer.timestamp + && key.equals(timer.key) + && window.equals(timer.window); + + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + key.hashCode(); + result = 31 * result + window.hashCode(); + return result; + } + + @Override + public String toString() { + return "Timer{" + + "timestamp=" + timestamp + + ", key=" + key + + ", window=" + window + + '}'; } } @@ -579,7 +483,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing * time semantics. */ - public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) { + public WindowOperator<K, IN, ACC, OUT, W> enableSetProcessingTime(boolean setProcessingTime) { this.setProcessingTime = setProcessingTime; return this; } @@ -592,21 +496,25 @@ public class WindowOperator<K, IN, OUT, W extends Window> public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - // we write the panes with the key/value maps into the stream - AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + AbstractStateBackend.CheckpointStateOutputView out = + getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - int numKeys = windows.size(); - out.writeInt(numKeys); + out.writeInt(watermarkTimersQueue.size()); + for (Timer<K, W> timer : watermarkTimersQueue) { + keySerializer.serialize(timer.key, out); + windowSerializer.serialize(timer.window, out); + out.writeLong(timer.timestamp); + } - for (Map.Entry<K, Map<W, Context>> keyWindows: windows.entrySet()) { - int numWindows = keyWindows.getValue().size(); - out.writeInt(numWindows); - for (Context context: keyWindows.getValue().values()) { - context.writeToState(out); - } + out.writeInt(processingTimeTimers.size()); + for (Timer<K, W> timer : processingTimeTimersQueue) { + keySerializer.serialize(timer.key, out); + windowSerializer.serialize(timer.window, out); + out.writeLong(timer.timestamp); } taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; } @@ -620,22 +528,28 @@ public class WindowOperator<K, IN, OUT, W extends Window> StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); DataInputView in = inputState.getState(userClassloader); - int numKeys = in.readInt(); - this.windows = new HashMap<>(numKeys); - this.processingTimeTimers = new HashMap<>(); - this.watermarkTimers = new HashMap<>(); - - for (int i = 0; i < numKeys; i++) { - int numWindows = in.readInt(); - for (int j = 0; j < numWindows; j++) { - Context context = new Context(in, userClassloader); - Map<W, Context> keyWindows = windows.get(context.key); - if (keyWindows == null) { - keyWindows = new HashMap<>(numWindows); - windows.put(context.key, keyWindows); - } - keyWindows.put(context.window, context); - } + int numWatermarkTimers = in.readInt(); + watermarkTimers = new HashSet<>(numWatermarkTimers); + watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); + for (int i = 0; i < numWatermarkTimers; i++) { + K key = keySerializer.deserialize(in); + W window = windowSerializer.deserialize(in); + long timestamp = in.readLong(); + Timer<K, W> timer = new Timer<>(timestamp, key, window); + watermarkTimers.add(timer); + watermarkTimersQueue.add(timer); + } + + int numProcessingTimeTimers = in.readInt(); + processingTimeTimers = new HashSet<>(numProcessingTimeTimers); + processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1)); + for (int i = 0; i < numProcessingTimeTimers; i++) { + K key = keySerializer.deserialize(in); + W window = windowSerializer.deserialize(in); + long timestamp = in.readLong(); + Timer<K, W> timer = new Timer<>(timestamp, key, window); + processingTimeTimers.add(timer); + processingTimeTimersQueue.add(timer); } } @@ -664,7 +578,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> } @VisibleForTesting - public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() { - return windowBufferFactory; + public StateDescriptor<? extends MergingState<IN, ACC>> getStateDescriptor() { + return windowStateDescriptor; } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 475a95d..037afe4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -518,7 +518,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { DataStream<String> window = map .windowAll(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(5))) - .apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() { + .apply(new AllWindowFunction<Iterable<Tuple2<Integer, String>>, String, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable<Tuple2<Integer, String>> values, http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 9b0a6d0..9297ae6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -21,9 +21,13 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.collector.selector.OutputSelector; @@ -212,9 +216,11 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { env.execute(); } + // Ignore because the count(10_000) window actually only emits one element during processing + // and all the rest in close() @SuppressWarnings("unchecked") - @Test @Ignore + @Test public void complexIntegrationTest3() throws Exception { //Heavy prime factorisation with maps and flatmaps @@ -248,6 +254,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { DataStream<Long> sourceStream31 = env.generateSequence(1, 10000); DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000); + sourceStream31.filter(new PrimeFilterFunction()) .windowAll(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(100))) @@ -258,9 +265,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { .max(0)) .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); - sourceStream31.flatMap(new DivisorsFlatMapFunction()) - .union(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long, - Integer>>() { + sourceStream31 + .flatMap(new DivisorsFlatMapFunction()) + .union(sourceStream32.flatMap(new DivisorsFlatMapFunction())) + .map(new MapFunction<Long, Tuple2<Long,Integer>>() { @Override public Tuple2<Long, Integer> map(Long value) throws Exception { @@ -271,42 +279,49 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(10_000))) .sum(1) - .filter(new FilterFunction<Tuple2<Long, Integer>>() { - @Override - public boolean filter(Tuple2<Long, Integer> value) throws Exception { - return value.f0 < 100 || value.f0 > 19900; - } - }) - .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); +// .filter(new FilterFunction<Tuple2<Long, Integer>>() { +// +// @Override +// public boolean filter(Tuple2<Long, Integer> value) throws Exception { +// return value.f0 < 100 || value.f0 > 19900; +// } +// }) + .print(); +// .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); env.execute(); } @Test @Ignore + @SuppressWarnings("unchecked, rawtypes") public void complexIntegrationTest4() throws Exception { //Testing mapping and delta-policy windowing with custom class expected1 = "((100,100),0)\n" + "((120,122),5)\n" + "((121,125),6)\n" + "((138,144),9)\n" + - "((139,147),10)\n" + "((156,166),13)\n" + "((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" + - "((192,210),21)\n" + "((193,213),22)\n" + "((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" + - "((229,257),30)\n" + "((246,276),33)\n" + "((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" + - "((282,320),41)\n" + "((283,323),42)\n" + "((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" + - "((319,367),50)\n" + "((336,386),53)\n" + "((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" + - "((372,430),61)\n" + "((373,433),62)\n" + "((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" + - "((409,477),70)\n" + "((426,496),73)\n" + "((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" + - "((462,540),81)\n" + "((463,543),82)\n" + "((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" + - "((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)"; + "((139,147),10)\n" + "((156,166),13)\n" + "((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" + + "((192,210),21)\n" + "((193,213),22)\n" + "((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" + + "((229,257),30)\n" + "((246,276),33)\n" + "((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" + + "((282,320),41)\n" + "((283,323),42)\n" + "((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" + + "((319,367),50)\n" + "((336,386),53)\n" + "((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" + + "((372,430),61)\n" + "((373,433),62)\n" + "((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" + + "((409,477),70)\n" + "((426,496),73)\n" + "((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" + + "((462,540),81)\n" + "((463,543),82)\n" + "((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" + + "((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); + TupleSerializer<Tuple2<Rectangle, Integer>> deltaSerializer = new TupleSerializer<>((Class) Tuple2.class, + new TypeSerializer[] {new KryoSerializer<>(Rectangle.class, env.getConfig()), + IntSerializer.INSTANCE}); + env.addSource(new RectangleSource()) .global() .map(new RectangleMapFunction()) .windowAll(GlobalWindows.create()) - .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, new MyDelta()))) + .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, new MyDelta(), deltaSerializer))) .apply(new MyWindowMapFunction()) .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); @@ -674,7 +689,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { } } - private static class MyWindowMapFunction implements AllWindowFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>, GlobalWindow> { + private static class MyWindowMapFunction implements AllWindowFunction<Iterable<Tuple2<Rectangle, Integer>>, Tuple2<Rectangle, Integer>, GlobalWindow> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 02bb8b7..70adadf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -66,7 +66,7 @@ import static org.junit.Assert.*; public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") - private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class); + private final WindowFunction<Iterable<String>, String, String, TimeWindow> mockFunction = mock(WindowFunction.class); @SuppressWarnings("unchecked") private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); @@ -78,8 +78,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } }; - private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = - new WindowFunction<Integer, Integer, Integer, TimeWindow>() + private final WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> validatingIdentityFunction = + new WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow>() { @Override public void apply(Integer key, @@ -727,7 +727,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> { + private static class FailingFunction implements WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> { private final int failAfterElements; @@ -755,7 +755,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> { + private static class StatefulFunction extends RichWindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> { // we use a concurrent map here even though there is no concurrency, to // get "volatile" style access to entries http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 35bd209..5c37f36 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -767,7 +767,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { public void testKeyValueStateInWindowFunctionTumbling() { final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); try { - final long thirtySeconds = 30_000; + final long twoSeconds = 2000; final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(); final Object lock = new Object(); @@ -778,7 +778,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( new StatefulFunction(), fieldOneSelector, - IntSerializer.INSTANCE, tupleSerializer, thirtySeconds, thirtySeconds); + IntSerializer.INSTANCE, tupleSerializer, twoSeconds, twoSeconds); op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE), out); op.open(); @@ -798,18 +798,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } } - out.waitForNElements(2, 60_000); - - List<Tuple2<Integer, Integer>> result = out.getElements(); - assertEquals(2, result.size()); - - Collections.sort(result, tupleComparator); - assertEquals(45, result.get(0).f1.intValue()); - assertEquals(45, result.get(1).f1.intValue()); - - assertEquals(10, StatefulFunction.globalCounts.get(1).intValue()); - assertEquals(10, StatefulFunction.globalCounts.get(2).intValue()); - + while (StatefulFunction.globalCounts.get(1) < 10 || + StatefulFunction.globalCounts.get(2) < 10) + { + Thread.sleep(50); + } + op.close(); op.dispose(); } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 282c71f..d9ba872 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -77,7 +77,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -126,7 +126,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -177,7 +177,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -204,7 +204,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase // UDFs // ------------------------------------------------------------------------ - public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> { + public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java index 39033cc..571838f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java @@ -22,8 +22,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; @@ -56,7 +55,7 @@ public class EvictingNonKeyedWindowOperatorTest { GlobalWindows.create(), new GlobalWindow.Serializer(), new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(), - new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)), + new ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); @@ -96,10 +95,6 @@ public class EvictingNonKeyedWindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - - } // ------------------------------------------------------------------------ @@ -109,32 +104,9 @@ public class EvictingNonKeyedWindowOperatorTest { public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; - private boolean openCalled = false; - - private AtomicInteger closeCalled; - - public SumReducer(AtomicInteger closeCalled) { - this.closeCalled = closeCalled; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - openCalled = true; - } - - @Override - public void close() throws Exception { - super.close(); - closeCalled.incrementAndGet(); - } - @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { - if (!openCalled) { - Assert.fail("Open was not called"); - } return new Tuple2<>(value2.f0, value1.f1 + value2.f1); } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 1821308..2f1dce5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -18,22 +18,27 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; -import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; @@ -48,27 +53,35 @@ public class EvictingWindowOperatorTest { @Test @SuppressWarnings("unchecked") public void testCountTrigger() throws Exception { - AtomicInteger closeCalled = new AtomicInteger(0); final int WINDOW_SIZE = 4; final int WINDOW_SLIDE = 2; + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", + new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(), - new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)), + stateDesc, + new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + operator.setInputType(inputType, new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new OneInputStreamOperatorTestHarness<>(operator); + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + long initialTime = 0L; ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -105,24 +118,104 @@ public class EvictingWindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); + } - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + @Test + @SuppressWarnings("unchecked") + public void testCountTriggerWithApply() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + + final int WINDOW_SIZE = 4; + final int WINDOW_SLIDE = 2; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + + ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents", + new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new RichSumReducer<GlobalWindow>(closeCalled), + CountTrigger.of(WINDOW_SLIDE), + CountEvictor.of(WINDOW_SIZE)); + + operator.setInputType(inputType, new ExecutionConfig()); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // The global window actually ignores these timestamps... + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); } // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ - public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> { + public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + } + } + + public static class RichSumReducer<W extends Window> extends RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; - private AtomicInteger closeCalled; + private AtomicInteger closeCalled = new AtomicInteger(0); - public SumReducer(AtomicInteger closeCalled) { + public RichSumReducer(AtomicInteger closeCalled) { this.closeCalled = closeCalled; } @@ -139,13 +232,23 @@ public class EvictingWindowOperatorTest { } @Override - public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, - Tuple2<String, Integer> value2) throws Exception { + public void apply(String key, + W window, + Iterable<Tuple2<String, Integer>> input, + Collector<Tuple2<String, Integer>> out) throws Exception { + if (!openCalled) { Assert.fail("Open was not called"); } - return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + int sum = 0; + + for (Tuple2<String, Integer> t: input) { + sum += t.f1; + } + out.collect(new Tuple2<>(key, sum)); + } + } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java index 02e032a..c0e6ad4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java @@ -18,11 +18,12 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; @@ -77,7 +78,7 @@ public class NonKeyedWindowOperatorTest { SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), windowBufferFactory, - new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()), + new ReduceIterableAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -140,11 +141,6 @@ public class NonKeyedWindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } } @Test @@ -158,7 +154,7 @@ public class NonKeyedWindowOperatorTest { TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), windowBufferFactory, - new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()), + new ReduceIterableAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -219,11 +215,6 @@ public class NonKeyedWindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } } @Test @@ -237,7 +228,7 @@ public class NonKeyedWindowOperatorTest { GlobalWindows.create(), new GlobalWindow.Serializer(), windowBufferFactory, - new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), + new ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -298,11 +289,6 @@ public class NonKeyedWindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } } @Test @@ -316,7 +302,7 @@ public class NonKeyedWindowOperatorTest { GlobalWindows.create(), new GlobalWindow.Serializer(), windowBufferFactory, - new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), + new ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse( @@ -355,19 +341,23 @@ public class NonKeyedWindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } - } // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ - public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> { + public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + } + } + + public static class RichSumReducer extends RichReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; private boolean openCalled = false; @@ -400,7 +390,7 @@ public class NonKeyedWindowOperatorTest { @Parameterized.Parameters(name = "WindowBuffer = {0}") @SuppressWarnings("unchecked,rawtypes") public static Collection<WindowBufferFactory[]> windowBuffers(){ - return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new SumReducer())}, + return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new RichSumReducer())}, new WindowBufferFactory[]{new HeapWindowBuffer.Factory()} ); } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index 76c6f20..b99232a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; @@ -68,7 +68,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -111,7 +111,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window2 = source .timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { + .apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -132,7 +132,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase // UDFs // ------------------------------------------------------------------------ - public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> { + public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override
