The fact that Bounded vs Unbounded JOIN is performed by considering Bounded
PCollection as a Sideinput means that the Bounded PCollection should fit
into Memory. Am I right? In that case bounded PCollection of Hive (or)
HDFS, where data might not fit into Memory cannot be JOINED with Kafka?

Does this approach have something to do with Watermark? As the computation
might take time depending on the size of the Bounded Data, and the window
might get expired before the result for the window is emitted.

Thanks,
Rahul

On Thu, Jul 18, 2019 at 10:13 PM Rui Wang <ruw...@google.com> wrote:

> The idea is slowing change table is treated as a PCollectionView, which
> leads to a sideinput join implementation in which you join an unbounded
> windowed stream (Kafka) with triggering based sideinput (the slowing
> changing data). That's how it follows the pattern. If you are considering
> windowed data join windowed data case, in which one side of data is slowing
> changing, sounds like you only need to make the PCollectionView windowed by
> your need(fixed windowing for example. Also need to double check if
> sideinput can be triggered on non-global window), in this case windowing
> strategy seems has to be consistent for both sides.
>
> In BeamSQL, if one side of binary join is bounded PCollection, BeamSQL
> constructs a sideinput join already (you can check [1] for implementation
> detail). It's even more straightforward to do: one side of join is a
> PCollectionView? go sideinput join.
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L185
>
> -Rui
>
> On Thu, Jul 18, 2019 at 7:52 AM rahul patwari <rahulpatwari8...@gmail.com>
> wrote:
>
>> Hi Rui,
>>
>> I have a query about BEAM-7758.
>> If [Pattern: slowly changing lookup cache] is followed while defining and
>> constructing the lookup table and set it with SqlTransform, if any
>> aggregation (JOIN) need to be performed, say, with windowed Kafka
>> PCollection table and the lookup table, the aggregation cannot be done
>> unless both the PCollections have matching WindowFns as they are unbounded.
>> What can be done to treat the lookup table as Bounded PCollection and
>> perform aggregation with every window of Kafka's PCollection?
>>
>> Thanks,
>> Rahul
>>
>>
>> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ruw...@google.com> wrote:
>>
>>> Another approach is to let BeamSQL support it natively, as the title of
>>> this thread says: "as a Table in BeamSQL".
>>>
>>> We might be able to define a table with properties that says this table
>>> return a PCollectionView. By doing so we will have a trigger based
>>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>>> only need to construct a table and set it to SqlTransform
>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>>> *. *
>>>
>>> Create a JIRA to track this idea:
>>> https://jira.apache.org/jira/browse/BEAM-7758
>>>
>>>
>>> -Rui
>>>
>>>
>>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <r...@google.com> wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>>> code example )
>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>>
>>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>>> to manually call BigQuery using the BigQuery client.
>>>>
>>>> Regards
>>>>
>>>> Reza
>>>>
>>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <rahulpatwari8...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>>
>>>>> We have a use case to read slowly changing bounded data as a
>>>>> PCollection along with the main PCollection from Kafka(windowed) and use 
>>>>> it
>>>>> in the query of BeamSql.
>>>>>
>>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>>
>>>>> Approaches followed but not Successful:
>>>>>
>>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>>> Transform(which applies Beam I/O on the
>>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>>> to PCollection<Row> Apply BeamSQL
>>>>> Comments: Beam I/O reads data only once even though a long value is
>>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>>> optimizations be overridden?
>>>>>
>>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>>> the PCollection.
>>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>>> PCollection inside a DoFn?
>>>>>
>>>>> Are there any better Approaches?
>>>>>
>>>>> Regards,
>>>>> Rahul
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> This email may be confidential and privileged. If you received this
>>>> communication by mistake, please don't forward it to anyone else, please
>>>> erase all copies and attachments, and please let me know that it has gone
>>>> to the wrong person.
>>>>
>>>> The above terms reflect a potential business arrangement, are provided
>>>> solely as a basis for further discussion, and are not intended to be and do
>>>> not constitute a legally binding obligation. No legally binding obligations
>>>> will be created, implied, or inferred until an agreement in final form is
>>>> executed in writing by all parties involved.
>>>>
>>>

Reply via email to