[
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633419#comment-15633419
]
ASF GitHub Bot commented on FLINK-4174:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2736#discussion_r86388455
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
---
@@ -58,6 +61,458 @@
// For counting if close() is called the correct number of times on the
SumReducer
+ /**
+ * Tests CountEvictor evictAfter behavior
+ * @throws Exception
+ */
+ @Test
+ public void testCountEvictorEvictAfter() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+ final int WINDOW_SIZE = 4;
+ final int TRIGGER_COUNT = 2;
+ final boolean EVICT_AFTER = true;
+
+ TypeInformation<Tuple2<String, Integer>> inputType =
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<Tuple2<String, Integer>>>
streamRecordSerializer =
+ (TypeSerializer<StreamRecord<Tuple2<String, Integer>>>)
new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));
+
+ ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>>
stateDesc =
+ new ListStateDescriptor<>("window-contents",
streamRecordSerializer);
+
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>,
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new
RichSumReducer<GlobalWindow>(closeCalled)),
+ CountTrigger.of(TRIGGER_COUNT),
+ CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
+ 0);
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator,
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2),
Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4),
Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2),
Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4),
Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6),
Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
--- End diff --
As it is now, it doesn't seem this ever evicts elements because eviction
happens after triggering. If you add these lines that would exercise the
eviction:
```
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1),
initialTime + 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1),
initialTime + 1000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6),
Long.MAX_VALUE));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
```
It checks whether the previous firing with 6 elements truncated back to 4.
> Enhance Window Evictor
> ----------------------
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: vishnu viswanath
> Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the
> beginning). To do this Evictor must go through the list of elements and
> remove the elements that have to be evicted instead of the current approach
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement :
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)