I see. This is an interesting use case, and seems quite common. I think the
issue is that we are trying to use trigger in a global window of an
unbounded stream. This somehow leads Beam to use
AfterSynchronizedProcessingTime trigger. I am wondering is it OK for you to
change the global window to a window large enough, like a few days, to
simulate a global window? Or have you considered making it a 5-second fixed
window so updates will be emitted within 5 seconds and we don't need to use
the global window with trigger?

Thanks,
Xinyu

On Fri, Feb 12, 2021 at 7:14 AM Jan Bensien <stu128...@mail.uni-kiel.de>
wrote:

> Hey,
>
> Thank you for your help. With your help I was able to identify the
> problem within my code. The problem is that my trigger followed by an
> Latest.perKey transformation results in a
> AfterSynchronizedProcessingTime trigger.
> The minimal example of this problem is the following:
> https://gist.github.com/janb15/8c536dbdbc1be40e2698122257f74760
> I did try this within the wordcount project using version 2.27.0. This
> did not fix the problem.
> The use case that i want to implement uses a changelog stream to enrich
> my input with further data using side inputs. The stream, which is used
> to generate the side input is only updated when a user performs manual
> re-configuration. Because of this I use a trigger on a global window to
> periodically generate a view. Is there a way to circumvent the creation
> of a synchronized processing time trigger?
>
> With many thanks,
>
> Jan
>
> Am 2021-02-10 00:57, schrieb Xinyu Liu:
>
> > Jan,
> >
> > I tried the latest Beam 2.27 version and ran into the same issue as you
> > saw. I dug a bit deeper and it was caused by the recent changes in beam
> > to
> > enable SplitableParDo in all runners. While we are going to work with
> > Beam
> > to get this resolved, you can avoid the issue by adding this argument
> > "--experiments=use_deprecated_read" when running your program. This
> > flag
> > will disable the new code path to make it work as before.
> >
> > I also tried your triggering code in the KafkaWordCount example in
> > samza-beam-examples git repo
> > (https://github.com/apache/samza-beam-examples).
> > Seems it is working for me as I can see the 1 second early firing
> > within a
> > 10 sec window and the fired panes are accumulated. You can also use
> > this
> > git repo as a reference.
> >
> > Thanks,
> > Xinyu
> >
> > On Mon, Feb 8, 2021 at 9:41 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
> >
> > Hi, Jan,
> >
> > Thanks for reporting this issue to us. Processing time triggers are
> > supported in Samza Runner with version Beam 2.22.0 [1]. The
> > exception message wasn't updated after we added the support of
> > processing
> > time. Apologize for the confusion here. Looks most of the exception
> > messages have been fixed in the latest code.
> >
> > From reading the code, it seems we will only run into this exception if
> > we
> > somehow end up having TimeDomain as synchronized_processing_time [2].
> > Samza
> > runner does not support this time domain. Are you aware that your code
> > might use it somehow? If not, I can help debug further. We have other
> > users
> > who use processing time triggers for early triggering, and it was
> > working
> > fine.
> >
> > I will also take a look at 2.27.0. LinkedIn has been recently upgraded
> > to
> > 2.26.0, and we found a few issues. Previously we were using a version
> > close
> > to 2.24.0.
> >
> > Thanks,
> > Xinyu
> >
> > [1]:
> >
> https://github.com/apache/beam/blob/9b43fadb8bb6f4bcabc945fc299b378eb1d7d205/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java#L347
> > [2]
> >
> https://github.com/apache/beam/blob/055140203ce2df56ba903b05266466cf16562dde/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java#L49
> >
> > On Sun, Feb 7, 2021 at 2:24 PM Jan Bensien <stu128...@mail.uni-kiel.de>
> > wrote:
> >
> > Hello,
> >
> > I am currently trying to execute my Beam Pipelines using the Samza
> > Runner. I am using processing time triggers for calculating early
> > results for my larger windows.
> > However i am getting the following error:
> > java.lang.UnsupportedOperationException: class
> > org.apache.beam.runners.samza.SamzaRunner currently only supports event
> > time.
> > Looking at the capability matrix of
> > Beam(https://beam.apache.org/documentation/runners/capability-matrix/),
> > it looks like processing time should be supported.
> > I could not find a documentation, for the exact supported features for
> > the different runner versions.
> > I am using the version 2.22.0 for the Samza Runner but also tried
> > 2.25.0
> > and got the same error. When i tried to upgrade to 2.27.0 I got the
> > following error: java.lang.UnsupportedOperationException:
> > BundleFinalizer unsupported in Samza. This happens whenever i use
> > KafkaIO to read from Kafka. Even when i tried a Pipeline that did
> > nothing except reading from Kafka.
> >
> > The trigger that caused the exception is the following: .
> > triggering(Repeatedly.forever(
> > AfterProcessingTime.pastFirstElementInPane()
> > .plusDelayOf(Duration.standardSeconds(1))))
> > .accumulatingFiredPanes());
> >
> > Running the pipeline with the Direct Runner worked fine. Which version
> > is the latest stable version of the Samza Runner and does it support
> > processing time triggers?
> >
> > With many thanks,
> >
> > Jan

Reply via email to