Ah, now I remember that the Flink runner did never support processing-time
timers. I created a Jira issue for this:
https://issues.apache.org/jira/browse/BEAM-615

On Thu, 1 Sep 2016 at 19:20 Chawla,Sumit <[email protected]> wrote:

> Thanks Ajioscha\Thomas
>
> I will explore on the option to upgrade.  Meanwhile here is what observed
> with the above code in my local Flink Cluster.
>
> 1.  To start there are 0 records in Kafka
> 2.  Deploy the pipeline.  Two records are received in Kafka at time
> 10:00:00 AM
> 3.  The Pane with 100 records would not fire because expected data is not
> there.  I would expect the 30 sec based filter to fire and downstream to
> receive the record around 10:00:30 AM.
>
> 4.  No new records are arriving.  The downstream received the above record
> around 10 minutes later around 10:10:00 AM
>
> I am not sure whats actually triggering the window firing here.  ( does not
> look like to be 30 sec trigger)
>
>
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 31, 2016 at 11:14 PM, Aljoscha Krettek <[email protected]>
> wrote:
>
> > Ah I see, the Flink Runner had quite some updates in 0.2.0-incubating and
> > even more for the upcoming 0.3.0-incubating.
> >
> > On Thu, 1 Sep 2016 at 04:09 Thomas Groh <[email protected]>
> wrote:
> >
> > > In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner
> > with
> > > the DirectRunner (formerly InProcessPipelineRunner), which is capable
> of
> > > handling Unbounded Pipelines. Is it possible for you to upgrade?
> > >
> > > On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <[email protected]>
> > > wrote:
> > >
> > > > @Ajioscha,  My assumption is here that atleast one trigger should
> fire.
> > > > Either the 100 elements or the 30 second since first element.
> > (whichever
> > > > happens first)
> > > >
> > > > @Thomas - here is the error i get: I am using 0.1.0-incubating
> > > >
> > > > *ava.lang.IllegalStateException: no evaluator registered for
> > > > Read(UnboundedKafkaSource)*
> > > >
> > > > * at
> > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> > > > visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> > > > * at
> > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > > TransformTreeNode.java:225)*
> > > > * at
> > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > > TransformTreeNode.java:220)*
> > > > * at
> > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > > > TransformTreeNode.java:220)*
> > > > * a*
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <
> > [email protected]>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > could the reason for the second part of the trigger never firing be
> > > that
> > > > > there are never at least 100 elements per key. The trigger would
> only
> > > > fire
> > > > > if it saw 100 elements and with only 540 elements that seems
> unlikely
> > > if
> > > > > you have more than 6 keys.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > > KafkaIO is implemented using the UnboundedRead API, which is
> > > supported
> > > > by
> > > > > > the DirectRunner. You should be able to run without the
> > > > > withMaxNumRecords;
> > > > > > if you can't, I'd be very interested to see the stack trace that
> > you
> > > > get
> > > > > > when you try to run the Pipeline.
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <
> > > [email protected]
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > > > > Read(UnboundedSourceOfKafka)
> > > > > > >
> > > > > > > Regards
> > > > > > > Sumit Chawla
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > > > > [email protected]>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Ah ok, this might be a stupid question but did you remove
> this
> > > line
> > > > > > when
> > > > > > > > running it with Flink:
> > > > > > > >         .withMaxNumRecords(500)
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <
> > > [email protected]>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Aljoscha
> > > > > > > > >
> > > > > > > > > The code is not different while running on Flink.  It have
> > > > removed
> > > > > > > > business
> > > > > > > > > specific transformations only.
> > > > > > > > >
> > > > > > > > > Regards
> > > > > > > > > Sumit Chawla
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > > > > [email protected]
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > could you maybe also post the complete that you're using
> > with
> > > > the
> > > > > > > > > > FlinkRunner? I could have a look into it.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Aljoscha
> > > > > > > > > >
> > > > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > > > > [email protected]>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas
> > > > > > > > > > >
> > > > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka
> > > > issues.
> > > > > > > > > > Following
> > > > > > > > > > > is the snippet i am working on, and will post more
> > details
> > > > > once i
> > > > > > > get
> > > > > > > > > it
> > > > > > > > > > > working ( as of now i am unable to read messages from
> > Kafka
> > > > > using
> > > > > > > > > > > DirectRunner)
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > PipelineOptions pipelineOptions =
> > > > > > PipelineOptionsFactory.create();
> > > > > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > >         .withMaxNumRecords(500)
> > > > > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG,
> "test1",
> > > > > > > > > > >                 ConsumerConfig.ENABLE_AUTO_
> > COMMIT_CONFIG,
> > > > > "true",
> > > > > > > > > > >
>  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > > > "earliest"
> > > > > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > > > > byte[]>,
> > > > > > > > > > > KV<String, String>>() {
> > > > > > > > > > >     @Override
> > > > > > > > > > >     public void processElement(ProcessContext c) throws
> > > > > > Exception {
> > > > > > > > > > >         KV<byte[], byte[]> record =
> c.element().getKV();
> > > > > > > > > > >         c.output(KV.of(new String(record.getKey()), new
> > > > > > > > > > > String(record.getValue())));
> > > > > > > > > > >     }
> > > > > > > > > > > }))
> > > > > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > > > > String>>into(FixedWindows.of(
> > Duration.standardSeconds(10)))
> > > > > > > > > > >                 .withAllowedLateness(Duration.
> > > > > standardSeconds(1))
> > > > > > > > > > >                 .triggering(
> > > > > > > > > > >                         Repeatedly.forever(
> > > > > > > > > > >                                 AfterFirst.of(
> > > > > > > > > > >
> > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > > > > >
> > > > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > > > > >
> > > > > > > >  AfterPane.elementCountAtLeast(
> > > > > > > > > > 100)
> > > > > > > > > > >                                 )))
> > > > > > > > > > >                 .discardingFiredPanes())
> > > > > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> > > > Iterable<String>>,
> > > > > > > > Void>()
> > > > > > > > > {
> > > > > > > > > > >             @Override
> > > > > > > > > > >             public void processElement(ProcessContext
> c)
> > > > throws
> > > > > > > > > > Exception {
> > > > > > > > > > >                 KV<String, Iterable<String>> element =
> > > > > > c.element();
> > > > > > > > > > >                 Iterator<String> iterator =
> > > > > > > > > > element.getValue().iterator();
> > > > > > > > > > >                 int count = 0;
> > > > > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > > > > >                     iterator.next();
> > > > > > > > > > >                     count++;
> > > > > > > > > > >                 }
> > > > > > > > > > >                 System.out.println(String.format("Key
> %s
> > > > Value
> > > > > > > Count
> > > > > > > > > > > %d", element.getKey(), count));
> > > > > > > > > > >             }
> > > > > > > > > > >         }));
> > > > > > > > > > > pipeline.run();
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards
> > > > > > > > > > > Sumit Chawla
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > > > > <[email protected]
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > If you use the DirectRunner, do you observe the same
> > > > > behavior?
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit <
> > > > > > > > > [email protected]>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am using FlinkRunner.  Yes the second part of
> > trigger
> > > > > never
> > > > > > > > fires
> > > > > > > > > > for
> > > > > > > > > > > > me,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards
> > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh
> > > > > > > > > > <[email protected]
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hey Sumit;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What runner are you using? I can set up a test
> with
> > > the
> > > > > > same
> > > > > > > > > > trigger
> > > > > > > > > > > > > > reading from an unbounded input using the
> > > DirectRunner
> > > > > and
> > > > > > I
> > > > > > > > get
> > > > > > > > > > the
> > > > > > > > > > > > > > expected output panes.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Just to clarify, the second half of the trigger
> > > ('when
> > > > > the
> > > > > > > > first
> > > > > > > > > > > > element
> > > > > > > > > > > > > > has been there for at least 30+ seconds') simply
> > > never
> > > > > > fires?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit <
> > > > > > > > > > > [email protected]>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > That did not work.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > .triggering(
> > > > > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > > > > >
> >  AfterProcessingTime.
> > > > > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > > > > >
> > > > > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > > > > >
> > > > > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > > > > >                         )))
> > > > > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > What i am trying to do here.  This is to make
> > sure
> > > > that
> > > > > > > > > followup
> > > > > > > > > > > > > > > operations receive batches of records.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1.  Fire when at Pane has 100+ elements
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2.  Or Fire when the first element has been
> there
> > > for
> > > > > > > atleast
> > > > > > > > > 30
> > > > > > > > > > > > sec+.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > However,  2 point does not seem to work.  e.g.
> I
> > > have
> > > > > 540
> > > > > > > > > records
> > > > > > > > > > > in
> > > > > > > > > > > > > > > Kafka.  The first 500 records are available
> > > > > immediately,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > but the remaining 40 don't pass through. I was
> > > > > expecting
> > > > > > > 2nd
> > > > > > > > to
> > > > > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh
> > > > > > > > > > > > <[email protected]
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > You can adjust the trigger in the windowing
> > > > transform
> > > > > > if
> > > > > > > > your
> > > > > > > > > > > sink
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > handle being written to multiple times for
> the
> > > same
> > > > > > > window.
> > > > > > > > > For
> > > > > > > > > > > > > > example,
> > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > the sink appends to the output when it
> receives
> > > new
> > > > > > data
> > > > > > > > in a
> > > > > > > > > > > > window,
> > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > could add something like
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > > > > AfterProcessingTime.
> > > > > > > > > > > > > > > >
> pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > > > > withLateFirings(AfterPane.
> > > > elementCountAtLeast(1))).
> > > > > > > > discardin
> > > > > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > This will cause elements to be output some
> > amount
> > > > of
> > > > > > time
> > > > > > > > > after
> > > > > > > > > > > > they
> > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > first received from Kafka, even if Kafka does
> > not
> > > > > have
> > > > > > > any
> > > > > > > > > new
> > > > > > > > > > > > > > elements.
> > > > > > > > > > > > > > > > Elements will only be output by the
> GroupByKey
> > > > once.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > We should still have a JIRA to improve the
> > > KafkaIO
> > > > > > > > watermark
> > > > > > > > > > > > tracking
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > the absence of new records .
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM,
> Chawla,Sumit
> > <
> > > > > > > > > > > > > [email protected]
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I don't have much control over changing
> > KafkaIO
> > > > > > > > properties.
> > > > > > > > > > I
> > > > > > > > > > > > > added
> > > > > > > > > > > > > > > > > KafkaIO code for completing the example.
> Are
> > > > there
> > > > > > any
> > > > > > > > > > changes
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > done to Windowing to achieve the same
> > behavior?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu
> Angadi
> > > > > > > > > > > > > > > <[email protected]
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > The default implementation returns
> > processing
> > > > > > > timestamp
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > (in effect. more accurately it returns
> same
> > > as
> > > > > > > > > > > getTimestamp(),
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > As a work around, yes, you can provide
> your
> > > own
> > > > > > > > > watermarkFn
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec.
> > > (usage
> > > > > in
> > > > > > > > > javadoc
> > > > > > > > > > > > > > > > > > <https://github.com/apache/
> > > > > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I think default watermark should be
> > smarter.
> > > it
> > > > > > > should
> > > > > > > > > > > advance
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > time if there aren't any records to read
> > from
> > > > > > Kafka.
> > > > > > > > > Could
> > > > > > > > > > > you
> > > > > > > > > > > > > > file a
> > > > > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM,
> > > Chawla,Sumit <
> > > > > > > > > > > > > > > [email protected]>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I am trying to do some simple batch
> > > > processing
> > > > > on
> > > > > > > > > KafkaIO
> > > > > > > > > > > > > > records.
> > > > > > > > > > > > > > > > My
> > > > > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > > > > pipeline looks like following:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > > > > s"mytopic"))
> > > > > > > > > > > > > > > > > > >         .withBootstrapServers("
> > > > > localhost:9200")
> > > > > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new
> > > > > > > > > > ExtractKVMessage()))
> > > > > > > > > > > //
> > > > > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > .apply("WindowBy10Sec",
> > Window.<KV<String,
> > > > > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > .apply("GroupByKey",
> GroupByKey.create())
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink())
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > My Kafka Source already has some
> messages
> > > > > 1000+,
> > > > > > > and
> > > > > > > > > new
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > > > > every few minutes.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > When i start my pipeline,  i can see
> that
> > > it
> > > > > > reads
> > > > > > > > all
> > > > > > > > > > the
> > > > > > > > > > > > > 1000+
> > > > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > > from Kafka.  However, Window does not
> > fire
> > > > > > untill a
> > > > > > > > new
> > > > > > > > > > > > message
> > > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > Kafka.  And Sink does not receive any
> > > message
> > > > > > until
> > > > > > > > > that
> > > > > > > > > > > > point.
> > > > > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > to override the WaterMarkFn here?
> Since i
> > > am
> > > > > not
> > > > > > > > > > providing
> > > > > > > > > > > > any
> > > > > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > > > > , i am assuming that timestamps will be
> > > > > assigned
> > > > > > as
> > > > > > > > in
> > > > > > > > > > when
> > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > > > > i.e. ingestion time.  What is the
> default
> > > > > > > WaterMarkFn
> > > > > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > > > > the Window not supposed to be fired
> based
> > > on
> > > > > > > > Ingestion
> > > > > > > > > > > time?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to