Repository: flink Updated Branches: refs/heads/master 27b5c49e7 -> 20884c07d
[FLINK-3528] Add FoldingWindowBuffer for Non-Keyed Windows This makes AllWindowedStream.fold() take constant space, just like the keyed WindowOperator. Also this adds a new test case in EventTimeAllWindowCheckpointingITCase to verify that the FoldingWindowBuffer works. This also renames the preexisting window buffers to ReducingWindowBuffer and ListWindowBuffer to make the naming scheme consistent. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20884c07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20884c07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20884c07 Branch: refs/heads/master Commit: 20884c07d1a0cf2370ea4c565d1bf881ad483167 Parents: 27b5c49 Author: Aljoscha Krettek <[email protected]> Authored: Fri Feb 26 23:27:26 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Sat Feb 27 00:18:40 2016 +0100 ---------------------------------------------------------------------- .../api/common/functions/FoldFunction.java | 2 +- .../api/datastream/AllWindowedStream.java | 27 +-- .../EvictingNonKeyedWindowOperator.java | 9 +- .../windowing/NonKeyedWindowOperator.java | 41 ++--- .../windowing/buffers/EvictingWindowBuffer.java | 3 +- .../windowing/buffers/FoldingWindowBuffer.java | 163 +++++++++++++++++++ .../windowing/buffers/HeapWindowBuffer.java | 94 ----------- .../windowing/buffers/ListWindowBuffer.java | 127 +++++++++++++++ .../buffers/PreAggregatingHeapWindowBuffer.java | 99 ----------- .../windowing/buffers/ReducingWindowBuffer.java | 121 ++++++++++++++ .../windowing/buffers/WindowBuffer.java | 15 +- .../windowing/buffers/WindowBufferFactory.java | 29 ++-- .../windowing/AllWindowTranslationTest.java | 67 +++++++- .../EvictingNonKeyedWindowOperatorTest.java | 12 +- .../windowing/NonKeyedWindowOperatorTest.java | 43 ++--- .../api/scala/AllWindowTranslationTest.scala | 26 +-- .../EventTimeAllWindowCheckpointingITCase.java | 72 ++++++++ 17 files changed, 633 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java index 8194663..b52828e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java @@ -40,7 +40,7 @@ import java.io.Serializable; * @param <O> Type of the elements that the group/list/stream contains */ @Public -public interface FoldFunction<O,T> extends Function, Serializable { +public interface FoldFunction<O, T> extends Function, Serializable { /** * The core method of FoldFunction, combining two values into one value of the same type. * The fold function is consecutively applied to all values of a group until only a single value remains. http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 6b32880..268dd8c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -43,8 +43,9 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer; /** * A {@code AllWindowedStream} represents a data stream where the stream of @@ -157,7 +158,7 @@ public class AllWindowedStream<T, W extends Window> { if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), new ReduceIterableAllWindowFunction<W, T>(function), trigger, evictor); @@ -165,7 +166,7 @@ public class AllWindowedStream<T, W extends Window> { } else { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new PreAggregatingHeapWindowBuffer.Factory<>(function), + new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())), new ReduceIterableAllWindowFunction<W, T>(function), trigger); } @@ -255,12 +256,12 @@ public class AllWindowedStream<T, W extends Window> { String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; - NonKeyedWindowOperator<T, R, W> operator; + NonKeyedWindowOperator<T, T, R, W> operator; if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), function, trigger, evictor); @@ -268,7 +269,7 @@ public class AllWindowedStream<T, W extends Window> { } else { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), function, trigger); } @@ -329,7 +330,7 @@ public class AllWindowedStream<T, W extends Window> { if (evictor != null) { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), new ReduceApplyAllWindowFunction<>(preAggregator, function), trigger, evictor); @@ -337,8 +338,8 @@ public class AllWindowedStream<T, W extends Window> { } else { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator), - new ReduceApplyAllWindowFunction<>(preAggregator, function), + new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())), + function, trigger); } @@ -400,7 +401,7 @@ public class AllWindowedStream<T, W extends Window> { operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), + new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())), new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function), trigger, evictor); @@ -410,8 +411,8 @@ public class AllWindowedStream<T, W extends Window> { operator = new NonKeyedWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - new HeapWindowBuffer.Factory<T>(), - new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function), + new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())), + function, trigger); } http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java index 221367d..22d207d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java @@ -36,11 +36,12 @@ import static java.util.Objects.requireNonNull; * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator * * @param <IN> The type of the incoming elements. + * @param <ACC> The type of elements stored in the window buffers. * @param <OUT> The type of elements emitted by the {@code WindowFunction}. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal -public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> { +public class EvictingNonKeyedWindowOperator<IN, ACC, OUT, W extends Window> extends NonKeyedWindowOperator<IN, ACC, OUT, W> { private static final long serialVersionUID = 1L; @@ -48,8 +49,8 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, - WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory, - AllWindowFunction<IN, OUT, W> windowFunction, + WindowBufferFactory<? super IN, ACC, ? extends EvictingWindowBuffer<IN, ACC>> windowBufferFactory, + AllWindowFunction<ACC, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) { super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger); @@ -60,7 +61,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N @SuppressWarnings("unchecked, rawtypes") protected void emitWindow(Context context) throws Exception { timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp()); - EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer; + EvictingWindowBuffer<IN, ACC> windowBuffer = (EvictingWindowBuffer<IN, ACC>) context.windowBuffer; int toEvict = 0; if (windowBuffer.size() > 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index 95feadc..6bd5c7d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.util.InstantiationUtil; @@ -72,12 +71,13 @@ import static java.util.Objects.requireNonNull; * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator * * @param <IN> The type of the incoming elements. + * @param <ACC> The type of elements stored in the window buffers. * @param <OUT> The type of elements emitted by the {@code WindowFunction}. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal -public class NonKeyedWindowOperator<IN, OUT, W extends Window> - extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>> +public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window> + extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -92,7 +92,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> private final Trigger<? super IN, ? super W> trigger; - private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory; + private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory; /** * This is used to copy the incoming element because it can be put into several window @@ -145,8 +145,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> */ public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, - WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, - AllWindowFunction<IN, OUT, W> windowFunction, + WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory, + AllWindowFunction<ACC, OUT, W> windowFunction, Trigger<? super IN, ? super W> trigger) { super(windowFunction); @@ -180,9 +180,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> throw new IllegalStateException("Input serializer was not set."); } - windowBufferFactory.setRuntimeContext(getRuntimeContext()); - windowBufferFactory.open(getUserFunctionParameters()); - // these could already be initialized from restoreState() if (watermarkTimers == null) { watermarkTimers = new HashMap<>(); @@ -221,11 +218,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public final void dispose() { super.dispose(); windows.clear(); - try { - windowBufferFactory.close(); - } catch (Exception e) { - throw new RuntimeException("Error while closing WindowBufferFactory.", e); - } } @Override @@ -236,7 +228,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> for (W window: elementWindows) { Context context = windows.get(window); if (context == null) { - WindowBuffer<IN> windowBuffer = windowBufferFactory.create(); + WindowBuffer<IN, ACC> windowBuffer = windowBufferFactory.create(); context = new Context(window, windowBuffer); windows.put(window, context); } @@ -356,7 +348,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> protected class Context implements TriggerContext { protected W window; - protected WindowBuffer<IN> windowBuffer; + protected WindowBuffer<IN, ACC> windowBuffer; protected HashMap<String, Serializable> state; @@ -369,7 +361,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public Context( W window, - WindowBuffer<IN> windowBuffer) { + WindowBuffer<IN, ACC> windowBuffer) { this.window = window; this.windowBuffer = windowBuffer; state = new HashMap<>(); @@ -394,12 +386,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> in.read(stateData); state = InstantiationUtil.deserializeObject(stateData, userClassloader); - this.windowBuffer = windowBufferFactory.create(); - int numElements = in.readInt(); - MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); - for (int i = 0; i < numElements; i++) { - windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord()); - } + this.windowBuffer = windowBufferFactory.restoreFromSnapshot(in); } protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException { @@ -411,11 +398,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> out.writeInt(serializedState.length); out.write(serializedState, 0, serializedState.length); - MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); - out.writeInt(windowBuffer.size()); - for (StreamRecord<IN> element: windowBuffer.getElements()) { - recordSerializer.serialize(element, out); - } + windowBuffer.snapshot(out); } @Override @@ -635,7 +618,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> } @VisibleForTesting - public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() { + public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() { return windowBufferFactory; } } http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java index 25a8211..75f646d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java @@ -25,9 +25,10 @@ import org.apache.flink.annotation.Internal; * the buffer. * * @param <T> The type of elements that this {@code WindowBuffer} can store. + * @param <O> The type of elements that this window buffer will return when asked for its contents. */ @Internal -public interface EvictingWindowBuffer<T> extends WindowBuffer<T> { +public interface EvictingWindowBuffer<T, O> extends WindowBuffer<T, O> { /** * Removes the given number of elements, starting from the beginning. http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java new file mode 100644 index 0000000..fa44f9d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; + +/** + * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a + * {@link FoldFunction} to incrementally aggregate elements that are added to the buffer. + * + * @param <T> The type of elements that can be added to this {@code WindowBuffer}. + * @param <ACC> The type of the accumulator that this {@code WindowBuffer} can store. + */ +@Internal +public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T, ACC> { + + private final FoldFunction<T, ACC> foldFunction; + private final TypeSerializer<ACC> accSerializer; + private StreamRecord<ACC> data; + + protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, ACC initialAccumulator, TypeSerializer<ACC> accSerializer) { + this.foldFunction = foldFunction; + this.accSerializer = accSerializer; + this.data = new StreamRecord<>(initialAccumulator); + } + + protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, StreamRecord<ACC> initialAccumulator, TypeSerializer<ACC> accSerializer) { + this.foldFunction = foldFunction; + this.accSerializer = accSerializer; + this.data = initialAccumulator; + } + + @Override + public void storeElement(StreamRecord<T> element) throws Exception { + data.replace(foldFunction.fold(data.getValue(), element.getValue()), element.getTimestamp()); + } + + @Override + public Iterable<StreamRecord<ACC>> getElements() { + return Collections.singleton(data); + } + + @Override + public Iterable<ACC> getUnpackedElements() { + return Collections.singleton(data.getValue()); + } + + @Override + public int size() { + return 1; + } + + @Override + public void snapshot(DataOutputView out) throws IOException { + MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer); + recordSerializer.serialize(data, out); + } + + public static class Factory<T, ACC> implements WindowBufferFactory<T, ACC, FoldingWindowBuffer<T, ACC>> { + private static final long serialVersionUID = 1L; + + private final FoldFunction<T, ACC> foldFunction; + + private final TypeSerializer<ACC> accSerializer; + + private transient ACC initialAccumulator; + + public Factory(FoldFunction<T, ACC> foldFunction, ACC initialValue, TypeSerializer<ACC> accSerializer) { + this.foldFunction = foldFunction; + this.accSerializer = accSerializer; + this.initialAccumulator = initialValue; + } + + @Override + public FoldingWindowBuffer<T, ACC> create() { + return new FoldingWindowBuffer<>(foldFunction, accSerializer.copy(initialAccumulator), accSerializer); + } + + @Override + public FoldingWindowBuffer<T, ACC> restoreFromSnapshot(DataInputView in) throws IOException { + MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer); + StreamElement element = recordSerializer.deserialize(in); + return new FoldingWindowBuffer<>(foldFunction, element.<ACC>asRecord(), accSerializer); + } + + private void writeObject(final ObjectOutputStream out) throws IOException { + // write all the non-transient fields + out.defaultWriteObject(); + + + byte[] serializedDefaultValue; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) + { + accSerializer.serialize(initialAccumulator, outView); + + outView.flush(); + serializedDefaultValue = baos.toByteArray(); + } + catch (Exception e) { + throw new IOException("Unable to serialize initial accumulator of type " + + initialAccumulator.getClass().getSimpleName() + ".", e); + } + + out.writeInt(serializedDefaultValue.length); + out.write(serializedDefaultValue); + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + // read the non-transient fields + in.defaultReadObject(); + + // read the default value field + int size = in.readInt(); + byte[] buffer = new byte[size]; + int bytesRead = in.read(buffer); + + if (bytesRead != size) { + throw new RuntimeException("Read size does not match expected size."); + } + + try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) + { + initialAccumulator = accSerializer.deserialize(inView); + } + catch (Exception e) { + throw new IOException("Unable to deserialize initial accumulator.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java deleted file mode 100644 index 9db449b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.runtime.operators.windowing.buffers; - -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.ArrayDeque; - -/** - * An {@link EvictingWindowBuffer} that stores elements on the Java Heap. - * - * @param <T> The type of elements that this {@code WindowBuffer} can store. - */ -@Internal -public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> { - private static final long serialVersionUID = 1L; - - private ArrayDeque<StreamRecord<T>> elements; - - protected HeapWindowBuffer() { - this.elements = new ArrayDeque<>(); - } - - @Override - public void storeElement(StreamRecord<T> element) { - elements.add(element); - } - - @Override - public void removeElements(int count) { - // TODO determine if this can be done in a better way - for (int i = 0; i < count; i++) { - elements.removeFirst(); - } - } - - @Override - public Iterable<StreamRecord<T>> getElements() { - return elements; - } - - @Override - public Iterable<T> getUnpackedElements() { - return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() { - @Override - public T apply(StreamRecord<T> record) { - return record.getValue(); - } - }); - } - - @Override - public int size() { - return elements.size(); - } - - public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> { - private static final long serialVersionUID = 1L; - - @Override - public void setRuntimeContext(RuntimeContext ctx) {} - - @Override - public void open(Configuration config) {} - - @Override - public void close() {} - - @Override - public HeapWindowBuffer<T> create() { - return new HeapWindowBuffer<>(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java new file mode 100644 index 0000000..5b9dd3c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.ArrayDeque; + +/** + * An {@link EvictingWindowBuffer} that stores elements on the Java Heap. + * + * @param <T> The type of elements that this {@code WindowBuffer} can store. + */ +@Internal +public class ListWindowBuffer<T> implements EvictingWindowBuffer<T, T> { + + private final TypeSerializer<T> serializer; + + private ArrayDeque<StreamRecord<T>> elements; + + protected ListWindowBuffer(TypeSerializer<T> serializer) { + this.serializer = serializer; + this.elements = new ArrayDeque<>(); + } + + protected ListWindowBuffer(ArrayDeque<StreamRecord<T>> elements, TypeSerializer<T> serializer) { + this.serializer = serializer; + this.elements = elements; + } + + @Override + public void storeElement(StreamRecord<T> element) { + elements.add(element); + } + + @Override + public void removeElements(int count) { + // TODO determine if this can be done in a better way + for (int i = 0; i < count; i++) { + elements.removeFirst(); + } + } + + @Override + public Iterable<StreamRecord<T>> getElements() { + return elements; + } + + @Override + public Iterable<T> getUnpackedElements() { + return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() { + @Override + public T apply(StreamRecord<T> record) { + return record.getValue(); + } + }); + } + + @Override + public int size() { + return elements.size(); + } + + @Override + public void snapshot(DataOutputView out) throws IOException { + out.writeInt(elements.size()); + + MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer); + + for (StreamRecord<T> e: elements) { + recordSerializer.serialize(e, out); + } + } + + public static class Factory<T> implements WindowBufferFactory<T, T, ListWindowBuffer<T>> { + private static final long serialVersionUID = 1L; + + private final TypeSerializer<T> serializer; + + public Factory(TypeSerializer<T> serializer) { + this.serializer = serializer; + } + + @Override + public ListWindowBuffer<T> create() { + return new ListWindowBuffer<>(serializer); + } + + @Override + public ListWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException { + int size = in.readInt(); + + MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer); + + ArrayDeque<StreamRecord<T>> elements = new ArrayDeque<>(); + + for (int i = 0; i < size; i++) { + elements.add(recordSerializer.deserialize(in).<T>asRecord()); + } + + return new ListWindowBuffer<>(elements, serializer); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java deleted file mode 100644 index 5f8de4b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.runtime.operators.windowing.buffers; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.Collections; - -/** - * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a - * {@link ReduceFunction} to pre-aggregate elements that are added to the buffer. - * - * @param <T> The type of elements that this {@code WindowBuffer} can store. - */ -@Internal -public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> { - private static final long serialVersionUID = 1L; - - private final ReduceFunction<T> reduceFunction; - private transient StreamRecord<T> data; - - protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) { - this.reduceFunction = reduceFunction; - } - - @Override - public void storeElement(StreamRecord<T> element) throws Exception { - if (data == null) { - data = new StreamRecord<>(element.getValue(), element.getTimestamp()); - } else { - data.replace(reduceFunction.reduce(data.getValue(), element.getValue())); - } - } - - @Override - public Iterable<StreamRecord<T>> getElements() { - return Collections.singleton(data); - } - - @Override - public Iterable<T> getUnpackedElements() { - return Collections.singleton(data.getValue()); - } - - @Override - public int size() { - return 1; - } - - public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> { - private static final long serialVersionUID = 1L; - - private final ReduceFunction<T> reduceFunction; - - public Factory(ReduceFunction<T> reduceFunction) { - this.reduceFunction = reduceFunction; - } - - @Override - public void setRuntimeContext(RuntimeContext ctx) { - FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); - } - - @Override - public void open(Configuration config) throws Exception { - FunctionUtils.openFunction(reduceFunction, config); - } - - @Override - public void close() throws Exception { - FunctionUtils.closeFunction(reduceFunction); - } - - @Override - public PreAggregatingHeapWindowBuffer<T> create() { - return new PreAggregatingHeapWindowBuffer<>(reduceFunction); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java new file mode 100644 index 0000000..1f2b639 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.Collections; + +/** + * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a + * {@link ReduceFunction} to incrementally aggregate elements that are added to the buffer. + * + * @param <T> The type of elements that this {@code WindowBuffer} can store. + */ +@Internal +public class ReducingWindowBuffer<T> implements WindowBuffer<T, T> { + + private final ReduceFunction<T> reduceFunction; + private final TypeSerializer<T> serializer; + private StreamRecord<T> data; + + protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) { + this.reduceFunction = reduceFunction; + this.serializer = serializer; + this.data = null; + } + + protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, StreamRecord<T> data, TypeSerializer<T> serializer) { + this.reduceFunction = reduceFunction; + this.serializer = serializer; + this.data = data; + } + + @Override + public void storeElement(StreamRecord<T> element) throws Exception { + if (data == null) { + data = new StreamRecord<>(element.getValue(), element.getTimestamp()); + } else { + data.replace(reduceFunction.reduce(data.getValue(), element.getValue())); + } + } + + @Override + public Iterable<StreamRecord<T>> getElements() { + return Collections.singleton(data); + } + + @Override + public Iterable<T> getUnpackedElements() { + return Collections.singleton(data.getValue()); + } + + @Override + public int size() { + return 1; + } + + @Override + public void snapshot(DataOutputView out) throws IOException { + if (data != null) { + out.writeBoolean(true); + MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer); + recordSerializer.serialize(data, out); + } else { + out.writeBoolean(false); + } + } + + public static class Factory<T> implements WindowBufferFactory<T, T, ReducingWindowBuffer<T>> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + + private final TypeSerializer<T> serializer; + + public Factory(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) { + this.reduceFunction = reduceFunction; + this.serializer = serializer; + } + + @Override + public ReducingWindowBuffer<T> create() { + return new ReducingWindowBuffer<>(reduceFunction, serializer); + } + + @Override + public ReducingWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException { + boolean hasValue = in.readBoolean(); + if (hasValue) { + MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer); + StreamElement element = recordSerializer.deserialize(in); + return new ReducingWindowBuffer<>(reduceFunction, element.<T>asRecord(), serializer); + } else { + return new ReducingWindowBuffer<>(reduceFunction, serializer); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java index cbf7dda..16be0f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java @@ -19,10 +19,11 @@ package org.apache.flink.streaming.runtime.operators.windowing.buffers; import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import java.io.Serializable; +import java.io.IOException; /** * A {@code WindowBuffer} is used by @@ -37,9 +38,10 @@ import java.io.Serializable; * have their own instance of the {@code Evictor}. * * @param <T> The type of elements that this {@code WindowBuffer} can store. + * @param <O> The type of elements that this window buffer will return when asked for its contents. */ @Internal -public interface WindowBuffer<T> extends Serializable { +public interface WindowBuffer<T, O> { /** * Adds the element to the buffer. @@ -51,16 +53,21 @@ public interface WindowBuffer<T> extends Serializable { /** * Returns all elements that are currently in the buffer. */ - Iterable<StreamRecord<T>> getElements(); + Iterable<StreamRecord<O>> getElements(); /** * Returns all elements that are currently in the buffer. This will unwrap the contained * elements from their {@link StreamRecord}. */ - Iterable<T> getUnpackedElements(); + Iterable<O> getUnpackedElements(); /** * Returns the number of elements that are currently in the buffer. */ int size(); + + /** + * Writes the contents of the window buffer to a {@link DataOutputView} for checkpointing. + */ + void snapshot(DataOutputView out) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java index a4f4b27..1ca6350 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java @@ -18,39 +18,30 @@ package org.apache.flink.streaming.runtime.operators.windowing.buffers; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import java.io.IOException; import java.io.Serializable; /** * A factory for {@link WindowBuffer WindowBuffers}. * * @param <T> The type of elements that the created {@code WindowBuffer} can store. + * @param <O> The type of elements that the created buffer will return when asked for its contents. * @param <B> The type of the created {@code WindowBuffer} */ @Internal -public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable { +public interface WindowBufferFactory<T, O, B extends WindowBuffer<T, O>> extends Serializable { /** - * Sets the {@link RuntimeContext} that is used to initialize eventual user functions - * inside the created buffers. - */ - void setRuntimeContext(RuntimeContext ctx); - - /** - * Calls {@code open()} on eventual user functions inside the buffer. - */ - void open(Configuration config) throws Exception; - - /** - * Calls {@code close()} on eventual user functions inside the buffer. + * Creates a new {@code WindowBuffer}. */ - - void close() throws Exception; + B create(); /** - * Creates a new {@code WindowBuffer}. + * Restores a {@code WindowBuffer} from a previous snapshot written using + * {@link WindowBuffer#snapshot(DataOutputView)}. */ - B create(); + B restoreFromSnapshot(DataInputView in) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 42f452c..f6e3dcc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -33,8 +34,9 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -72,7 +74,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); - Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) @@ -94,7 +96,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); - Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory); } @Test @@ -118,7 +120,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); - Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) @@ -141,7 +143,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); - Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory); } @Test @@ -166,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); - Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ListWindowBuffer.Factory); DataStream<Tuple2<String, Integer>> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) @@ -191,7 +193,46 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); - Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory); + } + + /** + * These tests ensure that a Fold buffer is used if possible + */ + @Test + @SuppressWarnings("rawtypes") + public void testFoldBuffer() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + + DummyFolder folder = new DummyFolder(); + + DataStream<Integer> window1 = source + .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .fold(0, folder); + + OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator); + NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1; + Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof FoldingWindowBuffer.Factory); + + DataStream<Integer> window2 = source + .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .evictor(CountEvictor.of(13)) + .fold(0, folder); + + OneInputTransformation<Tuple2<String, Integer>, Integer> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator); + NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2; + Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory); } // ------------------------------------------------------------------------ @@ -206,4 +247,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase return value1; } } + + public static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception { + return accumulator; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java index 571838f..c3a36dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -28,11 +29,10 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; -import org.junit.Assert; import org.junit.Test; import java.util.Comparator; @@ -51,15 +51,17 @@ public class EvictingNonKeyedWindowOperatorTest { final int WINDOW_SIZE = 4; final int WINDOW_SLIDE = 2; - EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>( + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), - new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(), + new ListWindowBuffer.Factory<Tuple2<String, Integer>>(inputType.createSerializer(new ExecutionConfig())), new ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + operator.setInputType(inputType, new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java index c0e6ad4..406f3b0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java @@ -20,8 +20,12 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -35,8 +39,8 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -59,7 +63,7 @@ public class NonKeyedWindowOperatorTest { @SuppressWarnings("unchecked,rawtypes") private WindowBufferFactory windowBufferFactory; - public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) { + public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?, ?> windowBufferFactory) { this.windowBufferFactory = windowBufferFactory; } @@ -74,7 +78,7 @@ public class NonKeyedWindowOperatorTest { final int WINDOW_SIZE = 3; final int WINDOW_SLIDE = 1; - NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>( + NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>( SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), windowBufferFactory, @@ -150,7 +154,7 @@ public class NonKeyedWindowOperatorTest { final int WINDOW_SIZE = 3; - NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>( + NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>( TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), windowBufferFactory, @@ -224,7 +228,7 @@ public class NonKeyedWindowOperatorTest { final int WINDOW_SIZE = 3; - NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>( + NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), windowBufferFactory, @@ -298,7 +302,7 @@ public class NonKeyedWindowOperatorTest { final int WINDOW_SIZE = 4; - NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>( + NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), windowBufferFactory, @@ -360,26 +364,9 @@ public class NonKeyedWindowOperatorTest { public static class RichSumReducer extends RichReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; - private boolean openCalled = false; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - openCalled = true; - } - - @Override - public void close() throws Exception { - super.close(); - closeCalled.incrementAndGet(); - } - @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { - if (!openCalled) { - Assert.fail("Open was not called"); - } return new Tuple2<>(value2.f0, value1.f1 + value2.f1); } } @@ -389,9 +376,11 @@ public class NonKeyedWindowOperatorTest { @Parameterized.Parameters(name = "WindowBuffer = {0}") @SuppressWarnings("unchecked,rawtypes") - public static Collection<WindowBufferFactory[]> windowBuffers(){ - return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new RichSumReducer())}, - new WindowBufferFactory[]{new HeapWindowBuffer.Factory()} + public static Collection<WindowBufferFactory[]> windowBuffers() { + TupleSerializer<Tuple2> tuple2TupleSerializer = new TupleSerializer<>(Tuple2.class, + new TypeSerializer<?>[]{StringSerializer.INSTANCE, IntSerializer.INSTANCE}); + return Arrays.asList(new WindowBufferFactory[]{new ReducingWindowBuffer.Factory(new SumReducer(), tuple2TupleSerializer)}, + new WindowBufferFactory[]{new ListWindowBuffer.Factory(tuple2TupleSerializer)} ); } http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index f73307c..a676757 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvic import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer} +import org.apache.flink.streaming.runtime.operators.windowing.buffers.{ListWindowBuffer, ReducingWindowBuffer} import org.apache.flink.streaming.runtime.operators.windowing._ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector @@ -111,12 +111,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator1 = transform1.getOperator - assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) - val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]] + assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( - winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]]) val window2 = source @@ -134,11 +134,11 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator2 = transform2.getOperator - assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]]) - val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]] + assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) - assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]]) } @Test @@ -161,12 +161,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator1 = transform1.getOperator - assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]) - val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] + assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger]) assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows]) - assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]]) val window2 = source @@ -185,12 +185,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator2 = transform2.getOperator - assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]) - val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]] + assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) - assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]]) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/20884c07/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index d18a45e..4431106 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.checkpointing; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; @@ -280,6 +281,77 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { } @Test + public void testPreAggregatedFoldingTumblingTimeWindow() { + final int NUM_ELEMENTS_PER_KEY = 3000; + final int WINDOW_SIZE = 100; + final int NUM_KEYS = 1; + FailingSource.reset(); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(PARALLELISM); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.getConfig().disableSysoutLogging(); + + env + .addSource(new FailingSource(NUM_KEYS, + NUM_ELEMENTS_PER_KEY, + NUM_ELEMENTS_PER_KEY / 3)) + .rebalance() + .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) + .apply(new Tuple4<>(0L, 0L, 0L, new IntType(0)), + new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() { + @Override + public Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> accumulator, + Tuple2<Long, IntType> value) throws Exception { + accumulator.f0 = value.f0; + accumulator.f3 = new IntType(accumulator.f3.value + value.f1.value); + return accumulator; + } + }, + new RichAllWindowFunction<Tuple4<Long, Long, Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(Configuration parameters) { + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + TimeWindow window, + Iterable<Tuple4<Long, Long, Long, IntType>> input, + Collector<Tuple4<Long, Long, Long, IntType>> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple4<Long, Long, Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f3)); + } + } + }) + .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); + + + tryExecute(env, "Tumbling Window Test"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test public void testPreAggregatedSlidingTimeWindow() { final int NUM_ELEMENTS_PER_KEY = 3000; final int WINDOW_SIZE = 1000;
