*~Vincent*
On Tue, Dec 8, 2020 at 1:34 PM Boyuan Zhang <[email protected]> wrote: > Hi Vicent, > > Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just > applies the window information to each element, not really does the > grouping operation. And in the commit transform, there is a combine > transform applied(Max.longsPerKey()). > Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey() > means to output 1 element per 5 mins. This is different from your case > since the trigger in the CommitTransform is for the combine purpose. > And in order to prevent the data loss error you mentioned, there is a > persistent layer(Reshuffle) between Kafka read and any downstream > transform. > > Apologies, I don't understand how the delay would work here though. If we have a kafka topic that has 100 messages in it, each with a timestamp one minute apart, that means we have 20 windows that will be generated from one possible fetch, outputted by the ReadFromKafkaDoFn. I understand the Max.longsPerKey() will take the max per window, but won't there still be 5 commits that happen as fast as possible for each of the windows that were constructed from the initial fetch? > For your case, will the pipeline like KafkaRead -> Reshuffle -> > GroupIntoBatches -> downstream help you? > > On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez <[email protected]> > wrote: > >> If this is the case that the pipeline has no way of enforcing fixed time >> windows, how does this work: >> >> >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126 >> >> Isn't this supposed to only trigger every five minutes, regardless of how >> much data can immediately be grouped together in five minute windows? If >> there is a way to mark that the fixed window should only trigger every so >> many minutes, that would solve my use case. If there isn't a way to do >> this, the Kafka offset code seems broken and could result in 'data loss' by >> improperly committing offsets before they are run through the rest of the >> pipeline? >> >> *~Vincent* >> >> >> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels <[email protected]> >> wrote: >> >>> > the downstream consumer has these requirements. >>> >>> Blocking should normally be avoided at all cost, but if the downstream >>> operator has the requirement to only emit a fixed number of messages per >>> second, it should enforce this, i.e. block once the maximum number of >>> messages for a time period have been reached. This will automatically >>> lead to backpressure in Runners like Flink or Dataflow. >>> >>> -Max >>> >>> On 07.10.20 18:30, Luke Cwik wrote: >>> > SplittableDoFns apply to both batch and streaming pipelines. They are >>> > allowed to produce an unbounded amount of data and can either self >>> > checkpoint saying they want to resume later or the runner will ask >>> them >>> > to checkpoint via a split call. >>> > >>> > There hasn't been anything concrete on backpressure, there has been >>> work >>> > done about exposing signals[1] related to IO that a runner can then >>> use >>> > intelligently but throttling isn't one of them yet. >>> > >>> > 1: >>> > >>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E >>> > < >>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E >>> > >>> > >>> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez >>> > <[email protected] <mailto:[email protected]>> wrote: >>> > >>> > Thanks for the response. Is my understanding correct that >>> > SplittableDoFns are only applicable to Batch pipelines? I'm >>> > wondering if there's any proposals to address backpressure needs? >>> > /~Vincent/ >>> > >>> > >>> > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > There is no general back pressure mechanism within Apache Beam >>> > (runners should be intelligent about this but there is >>> currently >>> > no way to say I'm being throttled so runners don't know that >>> > throwing more CPUs at a problem won't make it go faster). Y >>> > >>> > You can control how quickly you ingest data for runners that >>> > support splittable DoFns with SDK initiated checkpoints with >>> > resume delays. A splittable DoFn is able to return >>> > resume().withDelay(Duration.seconds(10)) from >>> > the @ProcessElement method. See Watch[1] for an example. >>> > >>> > The 2.25.0 release enables more splittable DoFn features on >>> more >>> > runners. I'm working on a blog (initial draft[2], still mostly >>> > empty) to update the old blog from 2017. >>> > >>> > 1: >>> > >>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908 >>> > < >>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908 >>> > >>> > 2: >>> > >>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit# >>> > < >>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit# >>> > >>> > >>> > >>> > On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez >>> > <[email protected] <mailto:[email protected]>> >>> > wrote: >>> > >>> > Hmm, I'm not sure how that will help, I understand how to >>> > batch up the data, but it is the triggering part that I >>> > don't see how to do. For example, in Spark Structured >>> > Streaming, you can set a time trigger which happens at a >>> > fixed interval all the way up to the source, so the source >>> > can throttle how much data to read even. >>> > >>> > Here is my use case more thoroughly explained: >>> > >>> > I have a Kafka topic (with multiple partitions) that I'm >>> > reading from, and I need to aggregate batches of up to 500 >>> > before sending a single batch off in an RPC call. However, >>> > the vendor specified a rate limit, so if there are >>> more than >>> > 500 unread messages in the topic, I must wait 1 second >>> > before issuing another RPC call. When searching on Stack >>> > Overflow I found this answer: >>> > https://stackoverflow.com/a/57275557/25658 >>> > <https://stackoverflow.com/a/57275557/25658> that makes it >>> > seem challenging, but I wasn't sure if things had changed >>> > since then or you had better ideas. >>> > >>> > /~Vincent/ >>> > >>> > >>> > On Thu, Oct 1, 2020 at 2:57 PM Luke Cwik <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > Look at the GroupIntoBatches[1] transform. It will >>> > buffer "batches" of size X for you. >>> > >>> > 1: >>> > >>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/ >>> > < >>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/ >>> > >>> > >>> > On Thu, Oct 1, 2020 at 2:51 PM Vincent Marquez >>> > <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > the downstream consumer has these requirements. >>> > >>> > /~Vincent/ >>> > >>> > >>> > On Thu, Oct 1, 2020 at 2:29 PM Luke Cwik >>> > <[email protected] <mailto:[email protected]>> >>> wrote: >>> > >>> > Why do you want to only emit X? (e.g. running >>> > out of memory in the runner) >>> > >>> > On Thu, Oct 1, 2020 at 2:08 PM Vincent Marquez >>> > <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > Hello all. If I want to 'throttle' the >>> > number of messages I pull off say, Kafka or >>> > some other queue, in order to make sure I >>> > only emit X amount per trigger, is there a >>> > way to do that and ensure that I get 'at >>> > least once' delivery guarantees? If this >>> > isn't supported, would the better way be to >>> > pull the limited amount opposed to doing it >>> > on the output side? >>> > >>> > / >>> > / >>> > /~Vincent/ >>> > >>> >>
