xljtswf created FLINK-30208:
-------------------------------
Summary: avoid unconditional state update in CountTrigger#onElement
Key: FLINK-30208
URL: https://issues.apache.org/jira/browse/FLINK-30208
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Reporter: xljtswf
In current CountTrigger#onElement, when one element is received, the state is
updated unconditionally, and we then fetch the state again to check whether we
need to clear the state. This implies we may update the state 2 times to
process one element. I suppose to make following simplification:
public TriggerResult onElement(Object element, long timestamp, W window,
TriggerContext ctx)
throws Exception {
TriggerResult triggerResult;
if (maxCount > 1) {
ReducingState<Long> countState = ctx.getPartitionedState(stateDesc);
Long currentCount = countState.get();
if (currentCount == null || currentCount < maxCount - 1) {
countState.add(1L);
triggerResult = TriggerResult.CONTINUE;
} else {
countState.clear();
triggerResult = TriggerResult.FIRE;
}
} else {
triggerResult = TriggerResult.FIRE;
}
return triggerResult;
}
If this is approved, I will make a pr then.
Thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)