[
https://issues.apache.org/jira/browse/FLINK-9668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
qi quan updated FLINK-9668:
---------------------------
Description:
For example, I would like to achieve a statistical window of one day, and I
want to output the result of the indicator every 1 minute.
So I implemented my Trigger like this.
onElement: check if valuestate has stored the nextfiretime, register the
nextfiretime,
onProcessingTime: Registers the nextfiretime (time+1min),update valuestate,
return FIRE_AND_PURGE.
(The amount of data in one day is too large. I don't want to store such a
large window state.)
{code:java}
public class PayAmountTrigger extends Trigger<Tuple2<String, String>,
TimeWindow> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PayAmountTrigger.class);
private static final Long PERIOD = 1000L * 60;
ValueStateDescriptor<Long> stateDesc = new
ValueStateDescriptor("fire-time", LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(Tuple2<String, String> tuple2, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Long> firstTimeState =
triggerContext.getPartitionedState(stateDesc);
long time = triggerContext.getCurrentProcessingTime();
if (firstTimeState.value() == null) {
long start = time - (time % PERIOD);
long nextFireTimestamp = start + PERIOD;
triggerContext.registerProcessingTimeTimer(nextFireTimestamp);
firstTimeState.update(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
TriggerContext triggerContext) throws Exception {
ValueState<Long> state = triggerContext.getPartitionedState(stateDesc);
if (state.value().equals(l)) {
state.clear();
state.update(l + PERIOD);
triggerContext.registerProcessingTimeTimer(l + PERIOD);
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow,
TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
throws Exception {
System.out.println("PayAmountTrigger_clear");
ValueState<Long> firstTimeState =
triggerContext.getPartitionedState(stateDesc);
long timestamp = firstTimeState.value();
triggerContext.deleteProcessingTimeTimer(timestamp);
firstTimeState.clear();
}
}{code}
Then I found out that if there is no data in this minute, onProcessingTime will
not be executed and you will miss the trigger time forever.
Then I dig through the code and find in the WindowOperator.onProcessingTime
{code:java}
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
if (contents != null) {
TriggerResult triggerResult =
triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}{code}
This means that if no data comes up for this minute,And I also purge the window
data, triggerContext.onProcessingTime will never be executed.I think this is a
bug in flink.
was:
For example, I would like to achieve a statistical window of one day, and I
want to output the result of the indicator every 1 minute.
So I implemented my Trigger like this.
onElement: check if valuestate has stored the nextfiretime, register the
nextfiretime,
onProcessingTime: Registers the nextfiretime (time+1min),update valuestate,
return FIRE_AND_PURGE.
(The amount of data in one day is too large. I don't want to store such a large
window state.)
{code:java}
public class PayAmountTrigger extends Trigger<Tuple2<String, String>,
TimeWindow> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PayAmountTrigger.class);
private static final Long PERIOD = 1000L * 5;
ValueStateDescriptor<Long> stateDesc = new
ValueStateDescriptor("fire-time", LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(Tuple2<String, String> tuple2, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Long> firstTimeState =
triggerContext.getPartitionedState(stateDesc);
long time = triggerContext.getCurrentProcessingTime();
if (firstTimeState.value() == null) {
long start = time - (time % PERIOD);
long nextFireTimestamp = start + PERIOD;
triggerContext.registerProcessingTimeTimer(nextFireTimestamp);
firstTimeState.update(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
TriggerContext triggerContext) throws Exception {
ValueState<Long> state = triggerContext.getPartitionedState(stateDesc);
if (state.value().equals(l)) {
state.clear();
state.update(l + PERIOD);
triggerContext.registerProcessingTimeTimer(l + PERIOD);
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow,
TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
throws Exception {
System.out.println("PayAmountTrigger_clear");
ValueState<Long> firstTimeState =
triggerContext.getPartitionedState(stateDesc);
long timestamp = firstTimeState.value();
triggerContext.deleteProcessingTimeTimer(timestamp);
firstTimeState.clear();
}
}{code}
Then I found out that if there is no data in this minute, onProcessingTime will
not be executed and you will miss the trigger time forever.
Then I dig through the code and find in the WindowOperator.onProcessingTime
{code:java}
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
if (contents != null) {
TriggerResult triggerResult =
triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}{code}
This means that if no data comes up for this minute,And I also purge the window
data, triggerContext.onProcessingTime will never be executed.I think this is a
bug in flink.
> In some case Trigger.onProcessingTime don't exectue
> ---------------------------------------------------
>
> Key: FLINK-9668
> URL: https://issues.apache.org/jira/browse/FLINK-9668
> Project: Flink
> Issue Type: Bug
> Reporter: qi quan
> Priority: Major
>
> For example, I would like to achieve a statistical window of one day, and I
> want to output the result of the indicator every 1 minute.
> So I implemented my Trigger like this.
> onElement: check if valuestate has stored the nextfiretime, register the
> nextfiretime,
> onProcessingTime: Registers the nextfiretime (time+1min),update valuestate,
> return FIRE_AND_PURGE.
> (The amount of data in one day is too large. I don't want to store such a
> large window state.)
> {code:java}
> public class PayAmountTrigger extends Trigger<Tuple2<String, String>,
> TimeWindow> {
> private static final Logger LOGGER =
> LoggerFactory.getLogger(PayAmountTrigger.class);
> private static final Long PERIOD = 1000L * 60;
> ValueStateDescriptor<Long> stateDesc = new
> ValueStateDescriptor("fire-time", LongSerializer.INSTANCE);
> @Override
> public TriggerResult onElement(Tuple2<String, String> tuple2, long l,
> TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState<Long> firstTimeState =
> triggerContext.getPartitionedState(stateDesc);
> long time = triggerContext.getCurrentProcessingTime();
> if (firstTimeState.value() == null) {
> long start = time - (time % PERIOD);
> long nextFireTimestamp = start + PERIOD;
> triggerContext.registerProcessingTimeTimer(nextFireTimestamp);
> firstTimeState.update(nextFireTimestamp);
> return TriggerResult.CONTINUE;
> }
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> ValueState<Long> state =
> triggerContext.getPartitionedState(stateDesc);
> if (state.value().equals(l)) {
> state.clear();
> state.update(l + PERIOD);
> triggerContext.registerProcessingTimeTimer(l + PERIOD);
> return TriggerResult.FIRE_AND_PURGE;
> }
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
> throws Exception {
> System.out.println("PayAmountTrigger_clear");
> ValueState<Long> firstTimeState =
> triggerContext.getPartitionedState(stateDesc);
> long timestamp = firstTimeState.value();
> triggerContext.deleteProcessingTimeTimer(timestamp);
> firstTimeState.clear();
> }
> }{code}
> Then I found out that if there is no data in this minute, onProcessingTime
> will not be executed and you will miss the trigger time forever.
> Then I dig through the code and find in the WindowOperator.onProcessingTime
> {code:java}
> ACC contents = null;
> if (windowState != null) {
> contents = windowState.get();
> }
> if (contents != null) {
> TriggerResult triggerResult =
> triggerContext.onProcessingTime(timer.getTimestamp());
> if (triggerResult.isFire()) {
> emitWindowContents(triggerContext.window, contents);
> }
> if (triggerResult.isPurge()) {
> windowState.clear();
> }
> }{code}
> This means that if no data comes up for this minute,And I also purge the
> window data, triggerContext.onProcessingTime will never be executed.I think
> this is a bug in flink.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)