huangjianhuang created BEAM-3414:
------------------------------------

             Summary: AfterProcessingTime trigger issue with Flink Runner
                 Key: BEAM-3414
                 URL: https://issues.apache.org/jira/browse/BEAM-3414
             Project: Beam
          Issue Type: Bug
          Components: runner-core, runner-flink
    Affects Versions: 2.2.0
         Environment: idea, ubuntu 16.04, FlinkRunner
            Reporter: huangjianhuang
            Assignee: Kenneth Knowles


in my demo, I read data from kafka and count globally, finally output the total 
count of recieved data, as follow:

{code:java}
        FlinkPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(FlinkPipelineOptions.class);

        options.setStreaming(true);
        options.setRunner(FlinkRunner.class);
        Pipeline pipeline = Pipeline.create(options);
        pipeline
                .apply("Read from kafka",
                        KafkaIO.<String, String>read()
//                                .withTimestampFn(kafkaData -> 
TimeUtil.timeMillisToInstant(kafkaData.getKey()))
                                .withBootstrapServers("localhost:9092")
                                .withTopic("recharge")
                                .withKeyDeserializer(StringDeserializer.class)
                                .withValueDeserializer(StringDeserializer.class)
                                .withoutMetadata()
                )
                .apply(Values.create())
                .apply(Window.<String>into(new GlobalWindows())
                                .triggering(Repeatedly.forever(
                                        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                                .accumulatingFiredPanes()
                )
                .apply(Count.globally())
                .apply("output",
                        ParDo.of(new DoFn<Long, Void>() {
                            @ProcessElement
                            public void process(ProcessContext context) {
                                System.out.println("---get at: " + 
Instant.now() + "------");
                                System.out.println(context.element());
                            }
                        }));
{code}

the result should be displayed after (5s) I sent first data, but sometimes 
there were nothing display after I sent data. the pic shows the outputs i got 
in a test:

(cant upload a pic, desc as text)

{code:java}
Send 681Msg at: 2018-01-05T06:34:31.436

        ---get at: 2018-01-05T06:34:36.668Z------
        681

Send 681Msg at: 2018-01-05T06:34:47.166

        ---get at: 2018-01-05T06:34:52.284Z------
        1362

Send 681Msg at: 2018-01-05T06:34:55.505

Send 681Msg at: 2018-01-05T06:35:22.068

        ---get at: 2018-01-05T06:35:22.112Z------
        2044
{code}

btw, the code works fine with direct runner.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to