[
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633418#comment-15633418
]
ASF GitHub Bot commented on FLINK-4174:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2736#discussion_r86385660
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
---
@@ -35,23 +37,63 @@
private static final long serialVersionUID = 1L;
private final long windowSize;
+ private final boolean doEvictAfter;
public TimeEvictor(long windowSize) {
this.windowSize = windowSize;
+ this.doEvictAfter = false;
+ }
+
+ public TimeEvictor(long windowSize, boolean doEvictAfter) {
+ this.windowSize = windowSize;
+ this.doEvictAfter = doEvictAfter;
}
+
@Override
- public int evict(Iterable<StreamRecord<Object>> elements, int size, W
window) {
- int toEvict = 0;
- long currentTime = Iterables.getLast(elements).getTimestamp();
+ public void evictBefore(Iterable<TimestampedValue<Object>> elements,
int size, W window, EvictorContext ctx) {
+ if(!doEvictAfter) {
+ evict(elements,size,ctx);
+ }
+ }
+
+ @Override
+ public void evictAfter(Iterable<TimestampedValue<Object>> elements, int
size, W window, EvictorContext ctx) {
+ if(doEvictAfter) {
+ evict(elements,size,ctx);
+ }
+ }
+
+ private void evict(Iterable<TimestampedValue<Object>> elements, int
size, EvictorContext ctx) {
+ long currentTime = getMaxTimestamp(elements);
+
+ if (currentTime < 0) {
+ //we don't evict any element if timestamp is not set in
the record
+ return;
+ }
long evictCutoff = currentTime - windowSize;
- for (StreamRecord<Object> record: elements) {
- if (record.getTimestamp() > evictCutoff) {
- break;
+
+ for (Iterator<TimestampedValue<Object>> iterator =
elements.iterator(); iterator.hasNext(); ) {
+ TimestampedValue<Object> record = iterator.next();
+ if (record.getTimestamp() <= evictCutoff) {
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
+ * @param elements The elements currently in the pane.
+ * @return The maximum value of timestamp among the elements
+ */
+ private long getMaxTimestamp(Iterable<TimestampedValue<Object>>
elements) {
+ long currentTime = Long.MIN_VALUE;
+ for (Iterator<TimestampedValue<Object>> iterator =
elements.iterator(); iterator.hasNext();){
+ TimestampedValue<Object> record = iterator.next();
+ if (record.getTimestamp() > currentTime) {
--- End diff --
This could be more succinctly expressed as `currentTime =
Math.max(currentTime, record.getTimestamp());`
> Enhance Window Evictor
> ----------------------
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: vishnu viswanath
> Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the
> beginning). To do this Evictor must go through the list of elements and
> remove the elements that have to be evicted instead of the current approach
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement :
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)