huangjianhuang created BEAM-3414:
------------------------------------
Summary: AfterProcessingTime trigger issue with Flink Runner
Key: BEAM-3414
URL: https://issues.apache.org/jira/browse/BEAM-3414
Project: Beam
Issue Type: Bug
Components: runner-core, runner-flink
Affects Versions: 2.2.0
Environment: idea, ubuntu 16.04, FlinkRunner
Reporter: huangjianhuang
Assignee: Kenneth Knowles
in my demo, I read data from kafka and count globally, finally output the total
count of recieved data, as follow:
{code:java}
FlinkPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(FlinkPipelineOptions.class);
options.setStreaming(true);
options.setRunner(FlinkRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read from kafka",
KafkaIO.<String, String>read()
// .withTimestampFn(kafkaData ->
TimeUtil.timeMillisToInstant(kafkaData.getKey()))
.withBootstrapServers("localhost:9092")
.withTopic("recharge")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply(Values.create())
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
.accumulatingFiredPanes()
)
.apply(Count.globally())
.apply("output",
ParDo.of(new DoFn<Long, Void>() {
@ProcessElement
public void process(ProcessContext context) {
System.out.println("---get at: " +
Instant.now() + "------");
System.out.println(context.element());
}
}));
{code}
the result should be displayed after (5s) I sent first data, but sometimes
there were nothing display after I sent data. the pic shows the outputs i got
in a test:
(cant upload a pic, desc as text)
{code:java}
Send 681Msg at: 2018-01-05T06:34:31.436
---get at: 2018-01-05T06:34:36.668Z------
681
Send 681Msg at: 2018-01-05T06:34:47.166
---get at: 2018-01-05T06:34:52.284Z------
1362
Send 681Msg at: 2018-01-05T06:34:55.505
Send 681Msg at: 2018-01-05T06:35:22.068
---get at: 2018-01-05T06:35:22.112Z------
2044
{code}
btw, the code works fine with direct runner.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)