Please note that each record output from ReadFromKafkaDoFn is in a
GlobalWindow. The workflow is:
ReadFromKafkaDoFn -> Reshuffle -> Window.into(FixedWindows) ->
Max.longsPerKey -> CommitDoFn
                                               |
                                               ---> downstream consumers

but won't there still be 5 commits that happen as fast as possible for each
> of the windows that were constructed from the initial fetch?

I'm not sure what you mean here. Would you like to elaborate more on your
questions?

On Tue, Dec 8, 2020 at 1:46 PM Vincent Marquez <[email protected]>
wrote:

>
> *~Vincent*
>
>
> On Tue, Dec 8, 2020 at 1:34 PM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Vicent,
>>
>> Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
>> applies the window information to each element, not really does the
>> grouping operation. And in the commit transform, there is a combine
>> transform applied(Max.longsPerKey()).
>> Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
>> means to output 1 element per 5 mins. This is different from your case
>> since the trigger in the CommitTransform is for the combine purpose.
>> And in order to prevent the data loss error you mentioned, there is a
>> persistent layer(Reshuffle) between Kafka read and any downstream
>> transform.
>>
>>
> Apologies, I don't understand how the delay would work here though.  If we
> have a kafka topic that has 100 messages in it, each with a timestamp one
> minute apart, that means we have 20 windows that will be generated from one
> possible fetch, outputted by the ReadFromKafkaDoFn.   I understand the
> Max.longsPerKey() will take the max per window, but won't there still be 5
> commits that happen as fast as possible for each of the windows that were
> constructed from the initial fetch?
>
>
>
>
>
>
>
>
>> For your case, will the pipeline like KafkaRead -> Reshuffle ->
>> GroupIntoBatches -> downstream help you?
>>
>> On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez <[email protected]>
>> wrote:
>>
>>> If this is the case that the pipeline has no way of enforcing fixed time
>>> windows, how does this work:
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>>>
>>> Isn't this supposed to only trigger every five minutes, regardless of
>>> how much data can immediately be grouped together in five minute windows?
>>> If there is a way to mark that the fixed window should only trigger every
>>> so many minutes, that would solve my use case.  If there isn't a way to do
>>> this, the Kafka offset code seems broken and could result in 'data loss' by
>>> improperly committing offsets before they are run through the rest of the
>>> pipeline?
>>>
>>> *~Vincent*
>>>
>>>
>>> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels <[email protected]>
>>> wrote:
>>>
>>>> > the downstream consumer has these requirements.
>>>>
>>>> Blocking should normally be avoided at all cost, but if the downstream
>>>> operator has the requirement to only emit a fixed number of messages
>>>> per
>>>> second, it should enforce this, i.e. block once the maximum number of
>>>> messages for a time period have been reached. This will automatically
>>>> lead to backpressure in Runners like Flink or Dataflow.
>>>>
>>>> -Max
>>>>
>>>> On 07.10.20 18:30, Luke Cwik wrote:
>>>> > SplittableDoFns apply to both batch and streaming pipelines. They are
>>>> > allowed to produce an unbounded amount of data and can either self
>>>> > checkpoint saying they want to resume later or the runner will ask
>>>> them
>>>> > to checkpoint via a split call.
>>>> >
>>>> > There hasn't been anything concrete on backpressure, there has been
>>>> work
>>>> > done about exposing signals[1] related to IO that a runner can then
>>>> use
>>>> > intelligently but throttling isn't one of them yet.
>>>> >
>>>> > 1:
>>>> >
>>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>>> > <
>>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>>> >
>>>> >
>>>> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
>>>> > <[email protected] <mailto:[email protected]>> wrote:
>>>> >
>>>> >     Thanks for the response.  Is my understanding correct that
>>>> >     SplittableDoFns are only applicable to Batch pipelines?  I'm
>>>> >     wondering if there's any proposals to address backpressure needs?
>>>> >     /~Vincent/
>>>> >
>>>> >
>>>> >     On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik <[email protected]
>>>> >     <mailto:[email protected]>> wrote:
>>>> >
>>>> >         There is no general back pressure mechanism within Apache Beam
>>>> >         (runners should be intelligent about this but there is
>>>> currently
>>>> >         no way to say I'm being throttled so runners don't know that
>>>> >         throwing more CPUs at a problem won't make it go faster). Y
>>>> >
>>>> >         You can control how quickly you ingest data for runners that
>>>> >         support splittable DoFns with SDK initiated checkpoints with
>>>> >         resume delays. A splittable DoFn is able to return
>>>> >         resume().withDelay(Duration.seconds(10)) from
>>>> >         the @ProcessElement method. See Watch[1] for an example.
>>>> >
>>>> >         The 2.25.0 release enables more splittable DoFn features on
>>>> more
>>>> >         runners. I'm working on a blog (initial draft[2], still mostly
>>>> >         empty) to update the old blog from 2017.
>>>> >
>>>> >         1:
>>>> >
>>>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>>>> >         <
>>>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>>>> >
>>>> >         2:
>>>> >
>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>>>> >         <
>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>>>> >
>>>> >
>>>> >
>>>> >         On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez
>>>> >         <[email protected] <mailto:[email protected]
>>>> >>
>>>> >         wrote:
>>>> >
>>>> >             Hmm, I'm not sure how that will help, I understand how to
>>>> >             batch up the data, but it is the triggering part that I
>>>> >             don't see how to do.  For example, in Spark Structured
>>>> >             Streaming, you can set a time trigger which happens at a
>>>> >             fixed interval all the way up to the source, so the source
>>>> >             can throttle how much data to read even.
>>>> >
>>>> >             Here is my use case more thoroughly explained:
>>>> >
>>>> >             I have a Kafka topic (with multiple partitions) that I'm
>>>> >             reading from, and I need to aggregate batches of up to 500
>>>> >             before sending a single batch off in an RPC call.
>>>> However,
>>>> >             the vendor specified a rate limit, so if there are
>>>> more than
>>>> >             500 unread messages in the topic, I must wait 1 second
>>>> >             before issuing another RPC call. When searching on Stack
>>>> >             Overflow I found this answer:
>>>> >             https://stackoverflow.com/a/57275557/25658
>>>> >             <https://stackoverflow.com/a/57275557/25658> that makes
>>>> it
>>>> >             seem challenging, but I wasn't sure if things had changed
>>>> >             since then or you had better ideas.
>>>> >
>>>> >             /~Vincent/
>>>> >
>>>> >
>>>> >             On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik <
>>>> [email protected]
>>>> >             <mailto:[email protected]>> wrote:
>>>> >
>>>> >                 Look at the GroupIntoBatches[1] transform. It will
>>>> >                 buffer "batches" of size X for you.
>>>> >
>>>> >                 1:
>>>> >
>>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>>> >                 <
>>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>>> >
>>>> >
>>>> >                 On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez
>>>> >                 <[email protected]
>>>> >                 <mailto:[email protected]>> wrote:
>>>> >
>>>> >                     the downstream consumer has these requirements.
>>>> >
>>>> >                     /~Vincent/
>>>> >
>>>> >
>>>> >                     On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik
>>>> >                     <[email protected] <mailto:[email protected]>>
>>>> wrote:
>>>> >
>>>> >                         Why do you want to only emit X? (e.g. running
>>>> >                         out of memory in the runner)
>>>> >
>>>> >                         On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez
>>>> >                         <[email protected]
>>>> >                         <mailto:[email protected]>> wrote:
>>>> >
>>>> >                             Hello all.  If I want to 'throttle' the
>>>> >                             number of messages I pull off say, Kafka
>>>> or
>>>> >                             some other queue, in order to make sure I
>>>> >                             only emit X amount per trigger, is there a
>>>> >                             way to do that and ensure that I get 'at
>>>> >                             least once' delivery guarantees?   If this
>>>> >                             isn't supported, would the better way be
>>>> to
>>>> >                             pull the limited amount opposed to doing
>>>> it
>>>> >                             on the output side?
>>>> >
>>>> >                             /
>>>> >                             /
>>>> >                             /~Vincent/
>>>> >
>>>>
>>>

Reply via email to