*~Vincent*

On Tue, Dec 8, 2020 at 1:34 PM Boyuan Zhang <[email protected]> wrote:

> Hi Vicent,
>
> Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
> applies the window information to each element, not really does the
> grouping operation. And in the commit transform, there is a combine
> transform applied(Max.longsPerKey()).
> Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
> means to output 1 element per 5 mins. This is different from your case
> since the trigger in the CommitTransform is for the combine purpose.
> And in order to prevent the data loss error you mentioned, there is a
> persistent layer(Reshuffle) between Kafka read and any downstream
> transform.
>
>
Apologies, I don't understand how the delay would work here though.  If we
have a kafka topic that has 100 messages in it, each with a timestamp one
minute apart, that means we have 20 windows that will be generated from one
possible fetch, outputted by the ReadFromKafkaDoFn.   I understand the
Max.longsPerKey() will take the max per window, but won't there still be 5
commits that happen as fast as possible for each of the windows that were
constructed from the initial fetch?








> For your case, will the pipeline like KafkaRead -> Reshuffle ->
> GroupIntoBatches -> downstream help you?
>
> On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez <[email protected]>
> wrote:
>
>> If this is the case that the pipeline has no way of enforcing fixed time
>> windows, how does this work:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>>
>> Isn't this supposed to only trigger every five minutes, regardless of how
>> much data can immediately be grouped together in five minute windows?  If
>> there is a way to mark that the fixed window should only trigger every so
>> many minutes, that would solve my use case.  If there isn't a way to do
>> this, the Kafka offset code seems broken and could result in 'data loss' by
>> improperly committing offsets before they are run through the rest of the
>> pipeline?
>>
>> *~Vincent*
>>
>>
>> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> > the downstream consumer has these requirements.
>>>
>>> Blocking should normally be avoided at all cost, but if the downstream
>>> operator has the requirement to only emit a fixed number of messages per
>>> second, it should enforce this, i.e. block once the maximum number of
>>> messages for a time period have been reached. This will automatically
>>> lead to backpressure in Runners like Flink or Dataflow.
>>>
>>> -Max
>>>
>>> On 07.10.20 18:30, Luke Cwik wrote:
>>> > SplittableDoFns apply to both batch and streaming pipelines. They are
>>> > allowed to produce an unbounded amount of data and can either self
>>> > checkpoint saying they want to resume later or the runner will ask
>>> them
>>> > to checkpoint via a split call.
>>> >
>>> > There hasn't been anything concrete on backpressure, there has been
>>> work
>>> > done about exposing signals[1] related to IO that a runner can then
>>> use
>>> > intelligently but throttling isn't one of them yet.
>>> >
>>> > 1:
>>> >
>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>> > <
>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>> >
>>> >
>>> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
>>> > <[email protected] <mailto:[email protected]>> wrote:
>>> >
>>> >     Thanks for the response.  Is my understanding correct that
>>> >     SplittableDoFns are only applicable to Batch pipelines?  I'm
>>> >     wondering if there's any proposals to address backpressure needs?
>>> >     /~Vincent/
>>> >
>>> >
>>> >     On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik <[email protected]
>>> >     <mailto:[email protected]>> wrote:
>>> >
>>> >         There is no general back pressure mechanism within Apache Beam
>>> >         (runners should be intelligent about this but there is
>>> currently
>>> >         no way to say I'm being throttled so runners don't know that
>>> >         throwing more CPUs at a problem won't make it go faster). Y
>>> >
>>> >         You can control how quickly you ingest data for runners that
>>> >         support splittable DoFns with SDK initiated checkpoints with
>>> >         resume delays. A splittable DoFn is able to return
>>> >         resume().withDelay(Duration.seconds(10)) from
>>> >         the @ProcessElement method. See Watch[1] for an example.
>>> >
>>> >         The 2.25.0 release enables more splittable DoFn features on
>>> more
>>> >         runners. I'm working on a blog (initial draft[2], still mostly
>>> >         empty) to update the old blog from 2017.
>>> >
>>> >         1:
>>> >
>>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>>> >         <
>>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>>> >
>>> >         2:
>>> >
>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>>> >         <
>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>>> >
>>> >
>>> >
>>> >         On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez
>>> >         <[email protected] <mailto:[email protected]>>
>>> >         wrote:
>>> >
>>> >             Hmm, I'm not sure how that will help, I understand how to
>>> >             batch up the data, but it is the triggering part that I
>>> >             don't see how to do.  For example, in Spark Structured
>>> >             Streaming, you can set a time trigger which happens at a
>>> >             fixed interval all the way up to the source, so the source
>>> >             can throttle how much data to read even.
>>> >
>>> >             Here is my use case more thoroughly explained:
>>> >
>>> >             I have a Kafka topic (with multiple partitions) that I'm
>>> >             reading from, and I need to aggregate batches of up to 500
>>> >             before sending a single batch off in an RPC call.  However,
>>> >             the vendor specified a rate limit, so if there are
>>> more than
>>> >             500 unread messages in the topic, I must wait 1 second
>>> >             before issuing another RPC call. When searching on Stack
>>> >             Overflow I found this answer:
>>> >             https://stackoverflow.com/a/57275557/25658
>>> >             <https://stackoverflow.com/a/57275557/25658> that makes it
>>> >             seem challenging, but I wasn't sure if things had changed
>>> >             since then or you had better ideas.
>>> >
>>> >             /~Vincent/
>>> >
>>> >
>>> >             On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik <[email protected]
>>> >             <mailto:[email protected]>> wrote:
>>> >
>>> >                 Look at the GroupIntoBatches[1] transform. It will
>>> >                 buffer "batches" of size X for you.
>>> >
>>> >                 1:
>>> >
>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>> >                 <
>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>> >
>>> >
>>> >                 On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez
>>> >                 <[email protected]
>>> >                 <mailto:[email protected]>> wrote:
>>> >
>>> >                     the downstream consumer has these requirements.
>>> >
>>> >                     /~Vincent/
>>> >
>>> >
>>> >                     On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik
>>> >                     <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >
>>> >                         Why do you want to only emit X? (e.g. running
>>> >                         out of memory in the runner)
>>> >
>>> >                         On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez
>>> >                         <[email protected]
>>> >                         <mailto:[email protected]>> wrote:
>>> >
>>> >                             Hello all.  If I want to 'throttle' the
>>> >                             number of messages I pull off say, Kafka or
>>> >                             some other queue, in order to make sure I
>>> >                             only emit X amount per trigger, is there a
>>> >                             way to do that and ensure that I get 'at
>>> >                             least once' delivery guarantees?   If this
>>> >                             isn't supported, would the better way be to
>>> >                             pull the limited amount opposed to doing it
>>> >                             on the output side?
>>> >
>>> >                             /
>>> >                             /
>>> >                             /~Vincent/
>>> >
>>>
>>

Reply via email to