Hi, could the reason for the second part of the trigger never firing be that there are never at least 100 elements per key. The trigger would only fire if it saw 100 elements and with only 540 elements that seems unlikely if you have more than 6 keys.
Cheers, Aljoscha On Wed, 31 Aug 2016 at 17:47 Thomas Groh <[email protected]> wrote: > KafkaIO is implemented using the UnboundedRead API, which is supported by > the DirectRunner. You should be able to run without the withMaxNumRecords; > if you can't, I'd be very interested to see the stack trace that you get > when you try to run the Pipeline. > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <[email protected]> > wrote: > > > Yes. I added it only for DirectRunner as it cannot translate > > Read(UnboundedSourceOfKafka) > > > > Regards > > Sumit Chawla > > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <[email protected]> > > wrote: > > > > > Ah ok, this might be a stupid question but did you remove this line > when > > > running it with Flink: > > > .withMaxNumRecords(500) > > > > > > Cheers, > > > Aljoscha > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <[email protected]> > > wrote: > > > > > > > Hi Aljoscha > > > > > > > > The code is not different while running on Flink. It have removed > > > business > > > > specific transformations only. > > > > > > > > Regards > > > > Sumit Chawla > > > > > > > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek < > [email protected] > > > > > > > wrote: > > > > > > > > > Hi, > > > > > could you maybe also post the complete that you're using with the > > > > > FlinkRunner? I could have a look into it. > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <[email protected]> > > > > wrote: > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka issues. > > > > > Following > > > > > > is the snippet i am working on, and will post more details once i > > get > > > > it > > > > > > working ( as of now i am unable to read messages from Kafka using > > > > > > DirectRunner) > > > > > > > > > > > > > > > > > > PipelineOptions pipelineOptions = > PipelineOptionsFactory.create(); > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class); > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions); > > > > > > pipeline.apply(KafkaIO.read() > > > > > > .withMaxNumRecords(500) > > > > > > .withTopics(ImmutableList.of("mytopic")) > > > > > > .withBootstrapServers("localhost:9092") > > > > > > .updateConsumerProperties(ImmutableMap.of( > > > > > > ConsumerConfig.GROUP_ID_CONFIG, "test1", > > > > > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true", > > > > > > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > "earliest" > > > > > > ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>, > > > > > > KV<String, String>>() { > > > > > > @Override > > > > > > public void processElement(ProcessContext c) throws > Exception { > > > > > > KV<byte[], byte[]> record = c.element().getKV(); > > > > > > c.output(KV.of(new String(record.getKey()), new > > > > > > String(record.getValue()))); > > > > > > } > > > > > > })) > > > > > > .apply("WindowByMinute", Window.<KV<String, > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10))) > > > > > > .withAllowedLateness(Duration.standardSeconds(1)) > > > > > > .triggering( > > > > > > Repeatedly.forever( > > > > > > AfterFirst.of( > > > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane() > > > > > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)), > > > > > > > > > AfterPane.elementCountAtLeast( > > > > > 100) > > > > > > ))) > > > > > > .discardingFiredPanes()) > > > > > > .apply("GroupByTenant", GroupByKey.create()) > > > > > > .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, > > > Void>() > > > > { > > > > > > @Override > > > > > > public void processElement(ProcessContext c) throws > > > > > Exception { > > > > > > KV<String, Iterable<String>> element = > c.element(); > > > > > > Iterator<String> iterator = > > > > > element.getValue().iterator(); > > > > > > int count = 0; > > > > > > while (iterator.hasNext()) { > > > > > > iterator.next(); > > > > > > count++; > > > > > > } > > > > > > System.out.println(String.format("Key %s Value > > Count > > > > > > %d", element.getKey(), count)); > > > > > > } > > > > > > })); > > > > > > pipeline.run(); > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh > > > <[email protected] > > > > > > > > > > > wrote: > > > > > > > > > > > > > If you use the DirectRunner, do you observe the same behavior? > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit < > > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > I am using FlinkRunner. Yes the second part of trigger never > > > fires > > > > > for > > > > > > > me, > > > > > > > > > > > > > > > > Regards > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh > > > > > <[email protected] > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Sumit; > > > > > > > > > > > > > > > > > > What runner are you using? I can set up a test with the > same > > > > > trigger > > > > > > > > > reading from an unbounded input using the DirectRunner and > I > > > get > > > > > the > > > > > > > > > expected output panes. > > > > > > > > > > > > > > > > > > Just to clarify, the second half of the trigger ('when the > > > first > > > > > > > element > > > > > > > > > has been there for at least 30+ seconds') simply never > fires? > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit < > > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > > > > > That did not work. > > > > > > > > > > > > > > > > > > > > I tried following instead: > > > > > > > > > > > > > > > > > > > > .triggering( > > > > > > > > > > Repeatedly.forever( > > > > > > > > > > AfterFirst.of( > > > > > > > > > > AfterProcessingTime. > > > > > > > > > pastFirstElementInPane() > > > > > > > > > > > > > .plusDelayOf(Duration.standard > > > > > > > > > > Seconds(30)), > > > > > > > > > > > > > > AfterPane.elementCountAtLeast(100) > > > > > > > > > > ))) > > > > > > > > > > .discardingFiredPanes() > > > > > > > > > > > > > > > > > > > > What i am trying to do here. This is to make sure that > > > > followup > > > > > > > > > > operations receive batches of records. > > > > > > > > > > > > > > > > > > > > 1. Fire when at Pane has 100+ elements > > > > > > > > > > > > > > > > > > > > 2. Or Fire when the first element has been there for > > atleast > > > > 30 > > > > > > > sec+. > > > > > > > > > > > > > > > > > > > > However, 2 point does not seem to work. e.g. I have 540 > > > > records > > > > > > in > > > > > > > > > > Kafka. The first 500 records are available immediately, > > > > > > > > > > > > > > > > > > > > but the remaining 40 don't pass through. I was expecting > > 2nd > > > to > > > > > > > > > > trigger to help here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh > > > > > > > <[email protected] > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > You can adjust the trigger in the windowing transform > if > > > your > > > > > > sink > > > > > > > > can > > > > > > > > > > > handle being written to multiple times for the same > > window. > > > > For > > > > > > > > > example, > > > > > > > > > > if > > > > > > > > > > > the sink appends to the output when it receives new > data > > > in a > > > > > > > window, > > > > > > > > > you > > > > > > > > > > > could add something like > > > > > > > > > > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...). > > > > > > > triggering(AfterWatermark. > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime. > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration. > > > > > > > standardSeconds(5))). > > > > > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))). > > > discardin > > > > > > > > > > gFiredPanes(); > > > > > > > > > > > > > > > > > > > > > > This will cause elements to be output some amount of > time > > > > after > > > > > > > they > > > > > > > > > are > > > > > > > > > > > first received from Kafka, even if Kafka does not have > > any > > > > new > > > > > > > > > elements. > > > > > > > > > > > Elements will only be output by the GroupByKey once. > > > > > > > > > > > > > > > > > > > > > > We should still have a JIRA to improve the KafkaIO > > > watermark > > > > > > > tracking > > > > > > > > > in > > > > > > > > > > > the absence of new records . > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit < > > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Thanks Raghu. > > > > > > > > > > > > > > > > > > > > > > > > I don't have much control over changing KafkaIO > > > properties. > > > > > I > > > > > > > > added > > > > > > > > > > > > KafkaIO code for completing the example. Are there > any > > > > > changes > > > > > > > > that > > > > > > > > > > can > > > > > > > > > > > be > > > > > > > > > > > > done to Windowing to achieve the same behavior? > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi > > > > > > > > > > <[email protected] > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > The default implementation returns processing > > timestamp > > > > of > > > > > > the > > > > > > > > last > > > > > > > > > > > > record > > > > > > > > > > > > > (in effect. more accurately it returns same as > > > > > > getTimestamp(), > > > > > > > > > which > > > > > > > > > > > > might > > > > > > > > > > > > > overridden by user). > > > > > > > > > > > > > > > > > > > > > > > > > > As a work around, yes, you can provide your own > > > > watermarkFn > > > > > > > that > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage in > > > > javadoc > > > > > > > > > > > > > <https://github.com/apache/ > > incubator-beam/blob/master/ > > > > > > > > > > > > > sdks/java/io/kafka/src/main/ > > > java/org/apache/beam/sdk/io/ > > > > > > > > > > > > > kafka/KafkaIO.java#L138> > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > I think default watermark should be smarter. it > > should > > > > > > advance > > > > > > > to > > > > > > > > > > > current > > > > > > > > > > > > > time if there aren't any records to read from > Kafka. > > > > Could > > > > > > you > > > > > > > > > file a > > > > > > > > > > > > jira? > > > > > > > > > > > > > > > > > > > > > > > > > > thanks, > > > > > > > > > > > > > Raghu. > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit < > > > > > > > > > > [email protected]> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi All > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am trying to do some simple batch processing on > > > > KafkaIO > > > > > > > > > records. > > > > > > > > > > > My > > > > > > > > > > > > > beam > > > > > > > > > > > > > > pipeline looks like following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read() > > > > > > > > > > > > > > .withTopics(ImmutableList.of(s"mytopic")) > > > > > > > > > > > > > > .withBootstrapServers("localhost:9200") > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new > > > > > ExtractKVMessage())) > > > > > > // > > > > > > > > > > Emits a > > > > > > > > > > > > > > KV<String,String> > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String, > > > > > > > > > > > > > > JSONObject>>into(FixedWindows. > > > > > of(Duration.standardSeconds( > > > > > > > > > > > > > > 10))).withAllowedLateness( > > > Duration.standardSeconds(1))) > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My Kafka Source already has some messages 1000+, > > and > > > > new > > > > > > > > messages > > > > > > > > > > > > arrive > > > > > > > > > > > > > > every few minutes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > When i start my pipeline, i can see that it > reads > > > all > > > > > the > > > > > > > > 1000+ > > > > > > > > > > > > messages > > > > > > > > > > > > > > from Kafka. However, Window does not fire > untill a > > > new > > > > > > > message > > > > > > > > > > > arrives > > > > > > > > > > > > > in > > > > > > > > > > > > > > Kafka. And Sink does not receive any message > until > > > > that > > > > > > > point. > > > > > > > > > > Do i > > > > > > > > > > > > > need > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am not > > > > > providing > > > > > > > any > > > > > > > > > > > > > timeStampFn > > > > > > > > > > > > > > , i am assuming that timestamps will be assigned > as > > > in > > > > > when > > > > > > > > > message > > > > > > > > > > > > > arrives > > > > > > > > > > > > > > i.e. ingestion time. What is the default > > WaterMarkFn > > > > > > > > > > implementation? > > > > > > > > > > > > Is > > > > > > > > > > > > > > the Window not supposed to be fired based on > > > Ingestion > > > > > > time? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
