Hi Pierre,

Joins using the join library are always per-window, so that is not likely
to work well for you. Actually side inputs are the right architecture. What
you really want is a map side input (View.asMap()) that does not pull the
whole Map into memory. It is up to the runner to implement this. I don't
know how the SparkRunner implements a Map side input, but it may not be too
hard to implement it so that the performance is how you want it.

On the other hand, there is a big question - does the MySQL database change
ever? Countries change slow, but I would guess that you might have other
use cases too. The bounded data is read once at the very beginning of the
pipeline's lifecycle.  If the stream lasts "forever" then you can end up
with streaming data where the bounded data is out of date. The simplest way
to explain it is that _changing_ data is unbounded. So then you need a
streaming side input with some triggers to update it, and logic to find the
"current" value for each key.

One approach is to just query your MySQL database from the streaming
ParDo(DoFn), if that gives you the consistency guarantees you are looking
for. You then may want to use state & timers to batch your requests.

Kenn

On Mon, Jan 14, 2019 at 4:40 AM Pierre Bailly Ferry <[email protected]>
wrote:

> Hello everyone,
>
> I am a developer at Talend for the project Data Streams.
>
> I face some trouble using the Join[1] from an unbounded source on the
> Spark Runner.
>
> I want to achieve the following use case:
> I have a Kafka source (an unbounded source, then) and I want to enrich
> these data with data from a database (a bounded source).
>
> The Join seems the correct solution to use on my use case, because:
> 1) I preprocess data from my database, so I cannot just call my database
> over and over like suggested on the Beam Pattern [2];
> 2) my database is big, and I want to avoid having all my database on every
> single node of my cluster, so I cannot use a simple side input.
>
> I tried a basic approach, but I reached really quickly an error in the
> GroupByKey:
>
> java.lang.IllegalStateException: Inputs to Flatten had incompatible window 
> windowFns:
> org.apache.beam.sdk.transforms.windowing.SlidingWindows@49d96f, 
> org.apache.beam.sdk.transforms.windowing.GlobalWindows
>
> I think the correct way to fix this trouble is to transform my
> GlobalWindow into a SlidingWindow. So, my goal is to implement the
> following pattern:
> [image: Using a window that accumulate data on the bounded input]
> Sadly, I am not able to create the Right Window correctly. My bounded
> input will create one window and will never repeat itself:
> [image: Same image, but with an invalid window]
> I tried multiple things for the right window:
> 1) Using AfterWatermark:
>
>        PCollection<KV<String, String>> lookupKV = extractKey(databaseInput)
>             .apply("Sliding Window from Bounded Source",
>                 Window.<KV<String, String>> into(
>
> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5)))
>                     .triggering(
>                             AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>
> .plusDelayOf(Duration.standardSeconds(5)))
>                     )
>                     .withAllowedLateness(Duration.standardSeconds(5))
>                     .accumulatingFiredPanes());
>
> 2) Using Repeatedly:
>
>        PCollection<KV<String, String>> lookupKV = extractKey(databaseInput)
>             .apply("Sliding Window from Bounded Source",
>             Window.<KV<String, String>> into(
>
> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5)))
>                     .triggering(
>                             Repeatedly.forever(
>                                 AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>                             )
>                     )
>                     .withAllowedLateness(Duration.standardSeconds(5))
>                     .accumulatingFiredPanes());
>
> 3) Just the sliding window (because why not, after all):
>
> PCollection<KV<String, String>> lookupKV = extractKey(databaseInput)
>       .apply("Sliding Window from Bounded Source",
>           Window.<KV<String, String>> into(
>
> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5)))
>               .accumulatingFiredPanes());
>
> Nothing works. I always get one microbatch of data, then nothing more.
>
> The root of my problem is that I cannot find a trigger that is activated
> by using only the time.
> I have access to "AfterWatermark" (that require the start of a window, so
> a new record processed), "AfterProcessingTime" (that required a first
> element to start the countdown) or "AfterPane" (that require at least one
> new element, so I cannot fake it with 0 element). Nothing is fully time
> related.
> Repeatedly cannot be used without a trigger, so it's not the solution
> either.
>
> Do you see any way to achieve the Batch -> Streaming windowing that I want
> to do?
> Side question: is there a way to keep the data locality between two
> window, so I do not move data from my right input over and over?
>
> tl;dr: I am unable to enrich my Kafka input with static data from my
> database.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/joinlibrary/Join.html
> [2]
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2
>
> Thanks,
> --
> Pierre Bailly-Ferry
>
>
> As a recipient of an email from Talend, your contact personal data will be
> on our systems. Please see our contacts privacy notice at Talend, Inc.
> <https://www.talend.com/contacts-privacy-policy/>
>
>
>

Reply via email to