[
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635780#comment-15635780
]
ASF GitHub Bot commented on FLINK-4174:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2736#discussion_r86510390
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
---
@@ -273,32 +284,93 @@ public void onProcessingTime(InternalTimer<K, W>
timer) throws Exception {
return;
}
- TriggerResult triggerResult =
context.onProcessingTime(timer.getTimestamp());
- if (triggerResult.isFire()) {
- fire(context.window, contents);
- }
+ TriggerResult triggerResult =
context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ fire(context.window, contents,
windowState);
+ }
if (triggerResult.isPurge() || (!windowAssigner.isEventTime()
&& isCleanupTime(context.window, timer.getTimestamp()))) {
cleanup(context.window, windowState, mergingWindows);
}
}
- private void fire(W window, Iterable<StreamRecord<IN>> contents) throws
Exception {
+ private void fire(W window, Iterable<StreamRecord<IN>> contents,
ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
- int toEvict = evictor.evict((Iterable) contents,
Iterables.size(contents), context.window);
-
- FluentIterable<IN> projectedContents = FluentIterable
+ FluentIterable<TimestampedValue<IN>> recordsWithTimestamp =
FluentIterable
.from(contents)
- .skip(toEvict)
- .transform(new Function<StreamRecord<IN>, IN>() {
+ .transform(new Function<StreamRecord<IN>,
TimestampedValue<IN>>() {
+ @Override
+ public TimestampedValue<IN>
apply(StreamRecord<IN> input) {
+ return new
TimestampedValue<>(input.getValue(), input.getTimestamp());
+ }
+ });
+ evictorContext.evictBefore(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));
+
+ FluentIterable<IN> projectedContents = recordsWithTimestamp
+ .transform(new Function<TimestampedValue<IN>, IN>() {
@Override
- public IN apply(StreamRecord<IN> input) {
+ public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
+
userFunction.apply(context.key, context.window,
projectedContents, timestampedCollector);
+ evictorContext.evictAfter(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));
+
+
+ //work around to fix FLINK-4369, remove the evicted elements
from the windowState.
+ //this is inefficient, but there is no other way to remove
elements from ListState, which is an AppendingState.
+ windowState.clear();
+ for(TimestampedValue<IN> record : recordsWithTimestamp) {
+ if (record.getTimestamp() < 0) {
--- End diff --
I think that's the way to go, yes.
> Enhance Window Evictor
> ----------------------
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: vishnu viswanath
> Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the
> beginning). To do this Evictor must go through the list of elements and
> remove the elements that have to be evicted instead of the current approach
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement :
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)