[
https://issues.apache.org/jira/browse/FLINK-13383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892611#comment-16892611
]
jinguishi edited comment on FLINK-13383 at 7/25/19 10:00 AM:
-------------------------------------------------------------
all right
was (Author: shijingui):
all rights
> Customize the problem in the trigger
> ------------------------------------
>
> Key: FLINK-13383
> URL: https://issues.apache.org/jira/browse/FLINK-13383
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.8.0
> Environment: The development environment is idea, the flink version
> is 1.8
> Reporter: jinguishi
> Priority: Blocker
> Attachments: WX20190723-174236.png, WechatIMG2.png
>
>
> Using a Tumbling time window, I created a time-based and counter trigger. The
> parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(),
> get negative values,
> There are screenshots in the attachment。
> code show as below
> {code:java}
> public class CountTrigger extends Trigger<Object, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private CountTrigger(int count) {
> this.threshold = count;
> }
> private int count = 0;
> private int threshold;
> @Override
> public TriggerResult onElement(Object element, long timestamp, TimeWindow
> window, TriggerContext ctx) {
> long watermark = ctx.getCurrentWatermark();
> ctx.registerEventTimeTimer(window.maxTimestamp());
> if (count > threshold) {
> count = 0;
> return TriggerResult.FIRE;
> } else {
> count++;
> }
> System.out.println("onElement: " + element);
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onEventTime(long time, TimeWindow window,
> Trigger.TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window,
> Trigger.TriggerContext ctx) throws Exception {
> return TriggerResult.FIRE;
> }
> @Override
> public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws
> Exception {
> ctx.deleteProcessingTimeTimer(window.maxTimestamp());
> }
> @Override
> public String toString() {
> return "CountTrigger";
> }
> public static <W extends Window> CountTrigger of(int threshold) {
> return new CountTrigger(threshold);
> }
> }
> {code}
> {code:java}
> public TriggerResult onElement(Object element, long timestamp, W window,
> TriggerContext ctx) throws Exception {
> ReducingState<Long> fireTimestamp =
> ctx.getPartitionedState(stateDesc);
> timestamp = ctx.getCurrentWatermark();
> long end = window.maxTimestamp();
> System.out.println(element + " " + timestamp + " " +
> window.maxTimestamp() + " " + fireTimestamp.get());
> if (fireTimestamp.get() == null) {
> long start = timestamp - (timestamp % interval);
> long nextFireTimestamp = start + interval;
> ctx.registerEventTimeTimer(nextFireTimestamp);
> fireTimestamp.add(nextFireTimestamp);
>
> return TriggerResult.CONTINUE;
> }
>
> return TriggerResult.CONTINUE;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)