[ 
https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314517#comment-16314517
 ] 

huangjianhuang commented on BEAM-3414:
--------------------------------------

Thanks for help. Does this bug appear in other Runner? Or can you give me some 
advice which Runner is more close to the DirectRunner. My codes works fine with 
DirectRunner but got so many problems with FlinkRunner:(

> AfterProcessingTime trigger issue with Flink Runner
> ---------------------------------------------------
>
>                 Key: BEAM-3414
>                 URL: https://issues.apache.org/jira/browse/BEAM-3414
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-flink
>    Affects Versions: 2.2.0
>         Environment: idea, ubuntu 16.04, FlinkRunner
>            Reporter: huangjianhuang
>            Assignee: Kenneth Knowles
>
> in my demo, I read data from kafka and count globally, finally output the 
> total count of recieved data, as follow:
> {code:java}
>         FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation()
>                 .as(FlinkPipelineOptions.class);
>         options.setStreaming(true);
>         options.setRunner(FlinkRunner.class);
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline
>                 .apply("Read from kafka",
>                         KafkaIO.<String, String>read()
> //                                .withTimestampFn(kafkaData -> 
> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
>                                 .withBootstrapServers("localhost:9092")
>                                 .withTopic("recharge")
>                                 .withKeyDeserializer(StringDeserializer.class)
>                                 
> .withValueDeserializer(StringDeserializer.class)
>                                 .withoutMetadata()
>                 )
>                 .apply(Values.create())
>                 .apply(Window.<String>into(new GlobalWindows())
>                                 .triggering(Repeatedly.forever(
>                                         
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                                 .accumulatingFiredPanes()
>                 )
>                 .apply(Count.globally())
>                 .apply("output",
>                         ParDo.of(new DoFn<Long, Void>() {
>                             @ProcessElement
>                             public void process(ProcessContext context) {
>                                 System.out.println("---get at: " + 
> Instant.now() + "------");
>                                 System.out.println(context.element());
>                             }
>                         }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes 
> there were nothing display after I sent data. the pic shows the outputs i got 
> in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
>       ---get at: 2018-01-05T06:34:36.668Z------
>       681
> Send 681Msg at: 2018-01-05T06:34:47.166
>       ---get at: 2018-01-05T06:34:52.284Z------
>       1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
>       ---get at: 2018-01-05T06:35:22.112Z------
>       2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to