Hello Kenneth,
Thank you so much for your answer.

I think I'm going to implement the side input method.
However, on the Spark Runner, the MapView is a simple HashMap[1], so I will 
have to put a lot of memory on my different spark executors.

For the life cycle of MySQL data, currently I focus on fixed data.
But if I ends up with a solution where I have to call my DB once a week, this 
would be fine.

I am not familiar with the ParDo/state mechanism. I will give it a look, that 
could be a solution that does not involve the MapView.

Again, thank you!
[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L322
--
Pierre Bailly-Ferry

On 14/01/2019 21:13, Kenneth Knowles wrote:
Hi Pierre,

Joins using the join library are always per-window, so that is not likely to 
work well for you. Actually side inputs are the right architecture. What you 
really want is a map side input (View.asMap()) that does not pull the whole Map 
into memory. It is up to the runner to implement this. I don't know how the 
SparkRunner implements a Map side input, but it may not be too hard to 
implement it so that the performance is how you want it.

On the other hand, there is a big question - does the MySQL database change 
ever? Countries change slow, but I would guess that you might have other use 
cases too. The bounded data is read once at the very beginning of the 
pipeline's lifecycle.  If the stream lasts "forever" then you can end up with 
streaming data where the bounded data is out of date. The simplest way to 
explain it is that _changing_ data is unbounded. So then you need a streaming 
side input with some triggers to update it, and logic to find the "current" 
value for each key.

One approach is to just query your MySQL database from the streaming 
ParDo(DoFn), if that gives you the consistency guarantees you are looking for. 
You then may want to use state & timers to batch your requests.

Kenn

On Mon, Jan 14, 2019 at 4:40 AM Pierre Bailly Ferry 
<pbai...@talend.com<mailto:pbai...@talend.com>> wrote:
Hello everyone,

I am a developer at Talend for the project Data Streams.

I face some trouble using the Join[1] from an unbounded source on the Spark 
Runner.

I want to achieve the following use case:
I have a Kafka source (an unbounded source, then) and I want to enrich these 
data with data from a database (a bounded source).

The Join seems the correct solution to use on my use case, because:
1) I preprocess data from my database, so I cannot just call my database over 
and over like suggested on the Beam Pattern [2];
2) my database is big, and I want to avoid having all my database on every 
single node of my cluster, so I cannot use a simple side input.

I tried a basic approach, but I reached really quickly an error in the 
GroupByKey:

java.lang.IllegalStateException: Inputs to Flatten had incompatible window 
windowFns:
org.apache.beam.sdk.transforms.windowing.SlidingWindows@49d96f, 
org.apache.beam.sdk.transforms.windowing.GlobalWindows

I think the correct way to fix this trouble is to transform my GlobalWindow 
into a SlidingWindow. So, my goal is to implement the following pattern:
[Using a                    window that accumulate data on the bounded input]
Sadly, I am not able to create the Right Window correctly. My bounded input 
will create one window and will never repeat itself:
[Same                    image, but with an invalid window]
I tried multiple things for the right window:
1) Using AfterWatermark:
       PCollection<KV<String, String>> lookupKV = extractKey(databaseInput)
            .apply("Sliding Window from Bounded Source",
                Window.<KV<String, String>> into(
                        
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5)))
                    .triggering(
                            AfterWatermark.pastEndOfWindow()
                                      
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                                               
.plusDelayOf(Duration.standardSeconds(5)))
                    )
                    .withAllowedLateness(Duration.standardSeconds(5))
                    .accumulatingFiredPanes());
2) Using Repeatedly:
       PCollection<KV<String, String>> lookupKV = extractKey(databaseInput)
            .apply("Sliding Window from Bounded Source",
            Window.<KV<String, String>> into(
                        
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5)))
                    .triggering(
                            Repeatedly.forever(
                                AfterWatermark.pastEndOfWindow()
                                        
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                            )
                    )
                    .withAllowedLateness(Duration.standardSeconds(5))
                    .accumulatingFiredPanes());
3) Just the sliding window (because why not, after all):
PCollection<KV<String, String>> lookupKV = extractKey(databaseInput)
      .apply("Sliding Window from Bounded Source",
          Window.<KV<String, String>> into(
              
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(5)))
              .accumulatingFiredPanes());
Nothing works. I always get one microbatch of data, then nothing more.

The root of my problem is that I cannot find a trigger that is activated by 
using only the time.
I have access to "AfterWatermark" (that require the start of a window, so a new 
record processed), "AfterProcessingTime" (that required a first element to 
start the countdown) or "AfterPane" (that require at least one new element, so 
I cannot fake it with 0 element). Nothing is fully time related.
Repeatedly cannot be used without a trigger, so it's not the solution either.

Do you see any way to achieve the Batch -> Streaming windowing that I want to 
do?
Side question: is there a way to keep the data locality between two window, so 
I do not move data from my right input over and over?

tl;dr: I am unable to enrich my Kafka input with static data from my database.

[1] 
https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/extensions/joinlibrary/Join.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.1.0_org_apache_beam_sdk_extensions_joinlibrary_Join.html&d=DwMFaQ&c=2w5q_42kFG40MI2alLPgJw&r=pEy6gKa-aDV-eW4ogHzTp9SZu4Rfdj92R19Z8tZzAvY&m=fJV4dDJx6VEinDoI60plJ7zxRpfRdz7Mem2LW7ejQYg&s=tWAUC8hFXFmagbGpcZPG-Fj6ioExFfB_GoaLYRt_nV8&e=>
[2] 
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2<https://urldefense.proofpoint.com/v2/url?u=https-3A__cloud.google.com_blog_products_gcp_guide-2Dto-2Dcommon-2Dcloud-2Ddataflow-2Duse-2Dcase-2Dpatterns-2Dpart-2D2&d=DwMFaQ&c=2w5q_42kFG40MI2alLPgJw&r=pEy6gKa-aDV-eW4ogHzTp9SZu4Rfdj92R19Z8tZzAvY&m=fJV4dDJx6VEinDoI60plJ7zxRpfRdz7Mem2LW7ejQYg&s=_HWWdAvajEegBANaoxk30d_wZ3AA4SYLe9e_BsKenAE&e=>

Thanks,
--
Pierre Bailly-Ferry



As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our contacts privacy notice at Talend, Inc. 
<https://www.talend.com/contacts-privacy-policy/>


Reply via email to