Ah ok, this might be a stupid question but did you remove this line when
running it with Flink:
.withMaxNumRecords(500)
Cheers,
Aljoscha
On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <[email protected]> wrote:
> Hi Aljoscha
>
> The code is not different while running on Flink. It have removed business
> specific transformations only.
>
> Regards
> Sumit Chawla
>
>
> On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <[email protected]>
> wrote:
>
> > Hi,
> > could you maybe also post the complete that you're using with the
> > FlinkRunner? I could have a look into it.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <[email protected]>
> wrote:
> >
> > > Hi Thomas
> > >
> > > Sorry i tried with DirectRunner but ran into some kafka issues.
> > Following
> > > is the snippet i am working on, and will post more details once i get
> it
> > > working ( as of now i am unable to read messages from Kafka using
> > > DirectRunner)
> > >
> > >
> > > PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
> > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > pipeline.apply(KafkaIO.read()
> > > .withMaxNumRecords(500)
> > > .withTopics(ImmutableList.of("mytopic"))
> > > .withBootstrapServers("localhost:9092")
> > > .updateConsumerProperties(ImmutableMap.of(
> > > ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
> > > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
> > > ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[], byte[]>,
> > > KV<String, String>>() {
> > > @Override
> > > public void processElement(ProcessContext c) throws Exception {
> > > KV<byte[], byte[]> record = c.element().getKV();
> > > c.output(KV.of(new String(record.getKey()), new
> > > String(record.getValue())));
> > > }
> > > }))
> > > .apply("WindowByMinute", Window.<KV<String,
> > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > .withAllowedLateness(Duration.standardSeconds(1))
> > > .triggering(
> > > Repeatedly.forever(
> > > AfterFirst.of(
> > >
> > > AfterProcessingTime.pastFirstElementInPane()
> > >
> > > .plusDelayOf(Duration.standardSeconds(30)),
> > > AfterPane.elementCountAtLeast(
> > 100)
> > > )))
> > > .discardingFiredPanes())
> > > .apply("GroupByTenant", GroupByKey.create())
> > > .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>()
> {
> > > @Override
> > > public void processElement(ProcessContext c) throws
> > Exception {
> > > KV<String, Iterable<String>> element = c.element();
> > > Iterator<String> iterator =
> > element.getValue().iterator();
> > > int count = 0;
> > > while (iterator.hasNext()) {
> > > iterator.next();
> > > count++;
> > > }
> > > System.out.println(String.format("Key %s Value Count
> > > %d", element.getKey(), count));
> > > }
> > > }));
> > > pipeline.run();
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh <[email protected]
> >
> > > wrote:
> > >
> > > > If you use the DirectRunner, do you observe the same behavior?
> > > >
> > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> [email protected]>
> > > > wrote:
> > > >
> > > > > Hi Thomas
> > > > >
> > > > > I am using FlinkRunner. Yes the second part of trigger never fires
> > for
> > > > me,
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > <[email protected]
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Sumit;
> > > > > >
> > > > > > What runner are you using? I can set up a test with the same
> > trigger
> > > > > > reading from an unbounded input using the DirectRunner and I get
> > the
> > > > > > expected output panes.
> > > > > >
> > > > > > Just to clarify, the second half of the trigger ('when the first
> > > > element
> > > > > > has been there for at least 30+ seconds') simply never fires?
> > > > > >
> > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > [email protected]>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Thomas
> > > > > > >
> > > > > > > That did not work.
> > > > > > >
> > > > > > > I tried following instead:
> > > > > > >
> > > > > > > .triggering(
> > > > > > > Repeatedly.forever(
> > > > > > > AfterFirst.of(
> > > > > > > AfterProcessingTime.
> > > > > > pastFirstElementInPane()
> > > > > > > .plusDelayOf(Duration.standard
> > > > > > > Seconds(30)),
> > > > > > >
> AfterPane.elementCountAtLeast(100)
> > > > > > > )))
> > > > > > > .discardingFiredPanes()
> > > > > > >
> > > > > > > What i am trying to do here. This is to make sure that
> followup
> > > > > > > operations receive batches of records.
> > > > > > >
> > > > > > > 1. Fire when at Pane has 100+ elements
> > > > > > >
> > > > > > > 2. Or Fire when the first element has been there for atleast
> 30
> > > > sec+.
> > > > > > >
> > > > > > > However, 2 point does not seem to work. e.g. I have 540
> records
> > > in
> > > > > > > Kafka. The first 500 records are available immediately,
> > > > > > >
> > > > > > > but the remaining 40 don't pass through. I was expecting 2nd to
> > > > > > > trigger to help here.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > <[email protected]
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > You can adjust the trigger in the windowing transform if your
> > > sink
> > > > > can
> > > > > > > > handle being written to multiple times for the same window.
> For
> > > > > > example,
> > > > > > > if
> > > > > > > > the sink appends to the output when it receives new data in a
> > > > window,
> > > > > > you
> > > > > > > > could add something like
> > > > > > > >
> > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > triggering(AfterWatermark.
> > > > > > > > pastEndOfWindow().withEarlyFirings(AfterProcessingTime.
> > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > standardSeconds(5))).
> > > > > > > > withLateFirings(AfterPane.elementCountAtLeast(1))).discardin
> > > > > > > gFiredPanes();
> > > > > > > >
> > > > > > > > This will cause elements to be output some amount of time
> after
> > > > they
> > > > > > are
> > > > > > > > first received from Kafka, even if Kafka does not have any
> new
> > > > > > elements.
> > > > > > > > Elements will only be output by the GroupByKey once.
> > > > > > > >
> > > > > > > > We should still have a JIRA to improve the KafkaIO watermark
> > > > tracking
> > > > > > in
> > > > > > > > the absence of new records .
> > > > > > > >
> > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit <
> > > > > [email protected]
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Raghu.
> > > > > > > > >
> > > > > > > > > I don't have much control over changing KafkaIO properties.
> > I
> > > > > added
> > > > > > > > > KafkaIO code for completing the example. Are there any
> > changes
> > > > > that
> > > > > > > can
> > > > > > > > be
> > > > > > > > > done to Windowing to achieve the same behavior?
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi
> > > > > > > <[email protected]
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > The default implementation returns processing timestamp
> of
> > > the
> > > > > last
> > > > > > > > > record
> > > > > > > > > > (in effect. more accurately it returns same as
> > > getTimestamp(),
> > > > > > which
> > > > > > > > > might
> > > > > > > > > > overridden by user).
> > > > > > > > > >
> > > > > > > > > > As a work around, yes, you can provide your own
> watermarkFn
> > > > that
> > > > > > > > > > essentially returns Now() or Now()-1sec. (usage in
> javadoc
> > > > > > > > > > <https://github.com/apache/incubator-beam/blob/master/
> > > > > > > > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > )
> > > > > > > > > >
> > > > > > > > > > I think default watermark should be smarter. it should
> > > advance
> > > > to
> > > > > > > > current
> > > > > > > > > > time if there aren't any records to read from Kafka.
> Could
> > > you
> > > > > > file a
> > > > > > > > > jira?
> > > > > > > > > >
> > > > > > > > > > thanks,
> > > > > > > > > > Raghu.
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > [email protected]>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi All
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I am trying to do some simple batch processing on
> KafkaIO
> > > > > > records.
> > > > > > > > My
> > > > > > > > > > beam
> > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > >
> > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > .withTopics(ImmutableList.of(s"mytopic"))
> > > > > > > > > > > .withBootstrapServers("localhost:9200")
> > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > ExtractKVMessage()))
> > > //
> > > > > > > Emits a
> > > > > > > > > > > KV<String,String>
> > > > > > > > > > >
> > > > > > > > > > > .apply("WindowBy10Sec", Window.<KV<String,
> > > > > > > > > > > JSONObject>>into(FixedWindows.
> > of(Duration.standardSeconds(
> > > > > > > > > > > 10))).withAllowedLateness(Duration.standardSeconds(1)))
> > > > > > > > > > >
> > > > > > > > > > > .apply("GroupByKey", GroupByKey.create())
> > > > > > > > > > >
> > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > My Kafka Source already has some messages 1000+, and
> new
> > > > > messages
> > > > > > > > > arrive
> > > > > > > > > > > every few minutes.
> > > > > > > > > > >
> > > > > > > > > > > When i start my pipeline, i can see that it reads all
> > the
> > > > > 1000+
> > > > > > > > > messages
> > > > > > > > > > > from Kafka. However, Window does not fire untill a new
> > > > message
> > > > > > > > arrives
> > > > > > > > > > in
> > > > > > > > > > > Kafka. And Sink does not receive any message until
> that
> > > > point.
> > > > > > > Do i
> > > > > > > > > > need
> > > > > > > > > > > to override the WaterMarkFn here? Since i am not
> > providing
> > > > any
> > > > > > > > > > timeStampFn
> > > > > > > > > > > , i am assuming that timestamps will be assigned as in
> > when
> > > > > > message
> > > > > > > > > > arrives
> > > > > > > > > > > i.e. ingestion time. What is the default WaterMarkFn
> > > > > > > implementation?
> > > > > > > > > Is
> > > > > > > > > > > the Window not supposed to be fired based on Ingestion
> > > time?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>