[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352010#comment-16352010 ]
huangjianhuang commented on BEAM-3414: -------------------------------------- [~kenn] Is jstorm-runner released? [Official document|https://beam.apache.org/documentation/runners/jstorm/] says we can add it by: {code:java} // code placeholder <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-jstorm</artifactId> <version>2.2.0</version> </dependency> {code} But the dependency seems not available. > AfterProcessingTime trigger issue with Flink Runner > --------------------------------------------------- > > Key: BEAM-3414 > URL: https://issues.apache.org/jira/browse/BEAM-3414 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink > Affects Versions: 2.2.0 > Environment: idea, ubuntu 16.04, FlinkRunner > Reporter: huangjianhuang > Assignee: Aljoscha Krettek > Priority: Major > > in my demo, I read data from kafka and count globally, finally output the > total count of recieved data, as follow: > {code:java} > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(FlinkPipelineOptions.class); > options.setStreaming(true); > options.setRunner(FlinkRunner.class); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply("Read from kafka", > KafkaIO.<String, String>read() > // .withTimestampFn(kafkaData -> > TimeUtil.timeMillisToInstant(kafkaData.getKey())) > .withBootstrapServers("localhost:9092") > .withTopic("recharge") > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Values.create()) > .apply(Window.<String>into(new GlobalWindows()) > .triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) > .accumulatingFiredPanes() > ) > .apply(Count.globally()) > .apply("output", > ParDo.of(new DoFn<Long, Void>() { > @ProcessElement > public void process(ProcessContext context) { > System.out.println("---get at: " + > Instant.now() + "------"); > System.out.println(context.element()); > } > })); > {code} > the result should be displayed after (5s) I sent first data, but sometimes > there were nothing display after I sent data. the pic shows the outputs i got > in a test: > (cant upload a pic, desc as text) > {code:java} > Send 681Msg at: 2018-01-05T06:34:31.436 > ---get at: 2018-01-05T06:34:36.668Z------ > 681 > Send 681Msg at: 2018-01-05T06:34:47.166 > ---get at: 2018-01-05T06:34:52.284Z------ > 1362 > Send 681Msg at: 2018-01-05T06:34:55.505 > Send 681Msg at: 2018-01-05T06:35:22.068 > ---get at: 2018-01-05T06:35:22.112Z------ > 2044 > {code} > btw, the code works fine with direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)