[ 
https://issues.apache.org/jira/browse/FLINK-9668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qi quan updated FLINK-9668:
---------------------------
    Description: 
For example, I would like to achieve a statistical window of one day, and I 
want to output the result of the indicator every 1 minute.
 So I implemented my Trigger like this.
 onElement: check if valuestate has stored the nextfiretime, register the 
nextfiretime,
 onProcessingTime: Registers the nextfiretime (time+1min),update valuestate, 
return FIRE_AND_PURGE.
 (The amount of data in one day is too large. I don't want to store such a 
large window state.)
{code:java}
public class PayAmountTrigger extends Trigger<Tuple2<String, String>, 
TimeWindow> {
    private static final Logger LOGGER = 
LoggerFactory.getLogger(PayAmountTrigger.class);
    private static final Long PERIOD = 1000L * 60;
    ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor("fire-time", LongSerializer.INSTANCE);

    @Override
    public TriggerResult onElement(Tuple2<String, String> tuple2, long l, 
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        ValueState<Long> firstTimeState = 
triggerContext.getPartitionedState(stateDesc);
        long time = triggerContext.getCurrentProcessingTime();
        if (firstTimeState.value() == null) {
            long start = time - (time % PERIOD);
            long nextFireTimestamp = start + PERIOD;
            triggerContext.registerProcessingTimeTimer(nextFireTimestamp);
            firstTimeState.update(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }


    @Override
    public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, 
TriggerContext triggerContext) throws Exception {
        ValueState<Long> state = triggerContext.getPartitionedState(stateDesc);
        if (state.value().equals(l)) {
            state.clear();
            state.update(l + PERIOD);
            triggerContext.registerProcessingTimeTimer(l + PERIOD);
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long l, TimeWindow timeWindow, 
TriggerContext triggerContext) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) 
throws Exception {
        System.out.println("PayAmountTrigger_clear");
        ValueState<Long> firstTimeState = 
triggerContext.getPartitionedState(stateDesc);
        long timestamp = firstTimeState.value();
        triggerContext.deleteProcessingTimeTimer(timestamp);
        firstTimeState.clear();
    }
}{code}
Then I found out that if there is no data in this minute, onProcessingTime will 
not be executed and you will miss the trigger time forever.
 Then I dig through the code and find in the WindowOperator.onProcessingTime
{code:java}
ACC contents = null;
if (windowState != null) {
   contents = windowState.get();
}

if (contents != null) {
   TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
   if (triggerResult.isFire()) {
      emitWindowContents(triggerContext.window, contents);
   }
   if (triggerResult.isPurge()) {
      windowState.clear();
   }
}{code}
This means that if no data comes up for this minute,And I also purge the window 
data, triggerContext.onProcessingTime will never be executed.I think this is a 
bug in flink.

  was:
For example, I would like to achieve a statistical window of one day, and I 
want to output the result of the indicator every 1 minute.
So I implemented my Trigger like this.
onElement: check if valuestate has stored the nextfiretime, register the 
nextfiretime,
onProcessingTime: Registers the nextfiretime (time+1min),update valuestate, 
return FIRE_AND_PURGE.
(The amount of data in one day is too large. I don't want to store such a large 
window state.)
{code:java}
public class PayAmountTrigger extends Trigger<Tuple2<String, String>, 
TimeWindow> {
    private static final Logger LOGGER = 
LoggerFactory.getLogger(PayAmountTrigger.class);
    private static final Long PERIOD = 1000L * 5;
    ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor("fire-time", LongSerializer.INSTANCE);

    @Override
    public TriggerResult onElement(Tuple2<String, String> tuple2, long l, 
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        ValueState<Long> firstTimeState = 
triggerContext.getPartitionedState(stateDesc);
        long time = triggerContext.getCurrentProcessingTime();
        if (firstTimeState.value() == null) {
            long start = time - (time % PERIOD);
            long nextFireTimestamp = start + PERIOD;
            triggerContext.registerProcessingTimeTimer(nextFireTimestamp);
            firstTimeState.update(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }


    @Override
    public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, 
TriggerContext triggerContext) throws Exception {
        ValueState<Long> state = triggerContext.getPartitionedState(stateDesc);
        if (state.value().equals(l)) {
            state.clear();
            state.update(l + PERIOD);
            triggerContext.registerProcessingTimeTimer(l + PERIOD);
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long l, TimeWindow timeWindow, 
TriggerContext triggerContext) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) 
throws Exception {
        System.out.println("PayAmountTrigger_clear");
        ValueState<Long> firstTimeState = 
triggerContext.getPartitionedState(stateDesc);
        long timestamp = firstTimeState.value();
        triggerContext.deleteProcessingTimeTimer(timestamp);
        firstTimeState.clear();
    }
}{code}
Then I found out that if there is no data in this minute, onProcessingTime will 
not be executed and you will miss the trigger time forever.
Then I dig through the code and find in the WindowOperator.onProcessingTime
{code:java}
ACC contents = null;
if (windowState != null) {
   contents = windowState.get();
}

if (contents != null) {
   TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
   if (triggerResult.isFire()) {
      emitWindowContents(triggerContext.window, contents);
   }
   if (triggerResult.isPurge()) {
      windowState.clear();
   }
}{code}
This means that if no data comes up for this minute,And I also purge the window 
data, triggerContext.onProcessingTime will never be executed.I think this is a 
bug in flink.


> In some case Trigger.onProcessingTime don't exectue
> ---------------------------------------------------
>
>                 Key: FLINK-9668
>                 URL: https://issues.apache.org/jira/browse/FLINK-9668
>             Project: Flink
>          Issue Type: Bug
>            Reporter: qi quan
>            Priority: Major
>
> For example, I would like to achieve a statistical window of one day, and I 
> want to output the result of the indicator every 1 minute.
>  So I implemented my Trigger like this.
>  onElement: check if valuestate has stored the nextfiretime, register the 
> nextfiretime,
>  onProcessingTime: Registers the nextfiretime (time+1min),update valuestate, 
> return FIRE_AND_PURGE.
>  (The amount of data in one day is too large. I don't want to store such a 
> large window state.)
> {code:java}
> public class PayAmountTrigger extends Trigger<Tuple2<String, String>, 
> TimeWindow> {
>     private static final Logger LOGGER = 
> LoggerFactory.getLogger(PayAmountTrigger.class);
>     private static final Long PERIOD = 1000L * 60;
>     ValueStateDescriptor<Long> stateDesc = new 
> ValueStateDescriptor("fire-time", LongSerializer.INSTANCE);
>     @Override
>     public TriggerResult onElement(Tuple2<String, String> tuple2, long l, 
> TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
>         ValueState<Long> firstTimeState = 
> triggerContext.getPartitionedState(stateDesc);
>         long time = triggerContext.getCurrentProcessingTime();
>         if (firstTimeState.value() == null) {
>             long start = time - (time % PERIOD);
>             long nextFireTimestamp = start + PERIOD;
>             triggerContext.registerProcessingTimeTimer(nextFireTimestamp);
>             firstTimeState.update(nextFireTimestamp);
>             return TriggerResult.CONTINUE;
>         }
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, 
> TriggerContext triggerContext) throws Exception {
>         ValueState<Long> state = 
> triggerContext.getPartitionedState(stateDesc);
>         if (state.value().equals(l)) {
>             state.clear();
>             state.update(l + PERIOD);
>             triggerContext.registerProcessingTimeTimer(l + PERIOD);
>             return TriggerResult.FIRE_AND_PURGE;
>         }
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public TriggerResult onEventTime(long l, TimeWindow timeWindow, 
> TriggerContext triggerContext) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public void clear(TimeWindow timeWindow, TriggerContext triggerContext) 
> throws Exception {
>         System.out.println("PayAmountTrigger_clear");
>         ValueState<Long> firstTimeState = 
> triggerContext.getPartitionedState(stateDesc);
>         long timestamp = firstTimeState.value();
>         triggerContext.deleteProcessingTimeTimer(timestamp);
>         firstTimeState.clear();
>     }
> }{code}
> Then I found out that if there is no data in this minute, onProcessingTime 
> will not be executed and you will miss the trigger time forever.
>  Then I dig through the code and find in the WindowOperator.onProcessingTime
> {code:java}
> ACC contents = null;
> if (windowState != null) {
>    contents = windowState.get();
> }
> if (contents != null) {
>    TriggerResult triggerResult = 
> triggerContext.onProcessingTime(timer.getTimestamp());
>    if (triggerResult.isFire()) {
>       emitWindowContents(triggerContext.window, contents);
>    }
>    if (triggerResult.isPurge()) {
>       windowState.clear();
>    }
> }{code}
> This means that if no data comes up for this minute,And I also purge the 
> window data, triggerContext.onProcessingTime will never be executed.I think 
> this is a bug in flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to