Regarding ordering; anything that requires inputs to be in a specific
order in Beam will be problematic due the nature of parallel processing -
you will always get race conditions.

Assuming you are still intending to Flatten the bigQuery and PubSub
PCollections, using Wait(on) before flattening the 2 PCollections will not
make much difference, as there is still a strong likelihood that the BQ and
Pubsub records will be interleaved in the BigQuery output.

If the BQ read is to update internal state, then I assume that you need to
store that state somewhere in your BusinessLogic DoFn() - if this storage
is in RAM, then all worker instances of your  BusinessLogic DoFn() will
need to have access to all records of that BQ read - the only way to do
this is through a side input - if you are sending this data it in the
normal input and you have multiple workers, each worker will only get some,
not all, of the BQ records so each worker's internal state would be
inconsistent.

 > Using big query client would mean we would have to run individual
queries for each of these 300k keys from the BusinessLogic() dofn which
operates in a global window KV

Or read all the BigQuery records at once on BusinessLogic() startup and
store them in the internal state ... which ends up being the same as using
a side input.


-- 
<https://cloud.google.com>
* •  **Niel Markwick*
* •  *Cloud Solutions Architect <https://cloud.google.com/docs/tutorials>
* •  *Google Belgium
* •  *ni...@google.com
* •  *+32 2 894 6771

Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie.
RPR: 0878.065.378

If you have received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged information),
please erase all copies of it, including all attachments, and please let
the sender know it went to the wrong person. Thanks



On Wed, 1 Mar 2023 at 02:15, Sahil Modak via dev <dev@beam.apache.org>
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>
> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
>>>> We are trying to re-initialize our state specs in the BusinessLogic()
>>>> DoFn from BQ.
>>>> BQ has data about the state spec, and we would like to make sure that
>>>> the state specs in our BusinessLogic() dofn are initialized before it
>>>> starts consuming the pub/sub.
>>>>
>>>> This is for handling the case of redeployment of the dataflow jobs so
>>>> that the states are preserved and the BusinessLogic() can work seamlessly
>>>> as it was previously. All our dofns are operating in a global window and do
>>>> not perform any aggregation.
>>>>
>>>> We are currently using Redis to preserve the state spec information but
>>>> would like to explore using BQ as an alternative to Redis.
>>>>
>>>> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> My suggestion is to try to solve the problem in terms of what you want
>>>>> to compute. Instead of trying to control the operational aspects like 
>>>>> "read
>>>>> all the BQ before reading Pubsub" there is presumably some reason that the
>>>>> BQ data naturally "comes first", for example if its timestamps are earlier
>>>>> or if there is a join or an aggregation that must include it. Whenever you
>>>>> think you want to set up an operational dependency between two things that
>>>>> "happen" in a pipeline, it is often best to pivot your thinking to the 
>>>>> data
>>>>> and what you are trying to compute, and the built-in dependencies will
>>>>> solve the ordering problems.
>>>>>
>>>>> So - is there a way to describe your problem in terms of the data and
>>>>> what you are trying to compute?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> First PCollections are completely unordered, so there is no guarantee
>>>>>> on what order you'll see events in the flattened PCollection.
>>>>>>
>>>>>> There may be ways to process the BigQuery data in a
>>>>>> separate transform first, but it depends on the structure of the data. 
>>>>>> How
>>>>>> large is the BigQuery table? Are you doing any windowed aggregations 
>>>>>> here?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>>>>> smo...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Yes, this is a streaming pipeline.
>>>>>>>
>>>>>>> Some more details about existing implementation v/s what we want to
>>>>>>> achieve.
>>>>>>>
>>>>>>> Current implementation:
>>>>>>> Reading from pub-sub:
>>>>>>>
>>>>>>> Pipeline input = Pipeline.create(options);
>>>>>>>
>>>>>>> PCollection<String> pubsubStream = input.apply("Read From Pubsub", 
>>>>>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>>>>>                                                
>>>>>>> .fromSubscription(inputSubscriptionId))
>>>>>>>
>>>>>>>
>>>>>>> Reading from bigquery:
>>>>>>>
>>>>>>> PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO
>>>>>>>         .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>>>>>
>>>>>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>>>>>
>>>>>>>
>>>>>>> Merge the inputs:
>>>>>>>
>>>>>>> PCollection<String> mergedInput = 
>>>>>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>>>>>> Flatten.pCollections());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Business Logic:
>>>>>>>
>>>>>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Above logic is what we use currently in our pipeline.
>>>>>>>
>>>>>>> We want to make sure that we read from BigQuery first & pass the 
>>>>>>> bqStream through our BusinessLogic() before we start consuming 
>>>>>>> pubsubStream.
>>>>>>>
>>>>>>> Is there a way to achieve this?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Sahil
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you explain this use case some more? Is this a streaming
>>>>>>>> pipeline? If so, how are you reading from BigQuery?
>>>>>>>>
>>>>>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have a requirement wherein we are consuming input from pub/sub
>>>>>>>>> (PubSubIO) as well as BQ (BQIO)
>>>>>>>>>
>>>>>>>>> We want to make sure that we consume the BQ stream first before we
>>>>>>>>> start consuming the data from pub-sub. Is there a way to achieve 
>>>>>>>>> this? Can
>>>>>>>>> you please help with some code samples?
>>>>>>>>>
>>>>>>>>> Currently, we read data from big query using BigQueryIO into a
>>>>>>>>> PCollection & also read data from pubsub using PubsubIO. We then use 
>>>>>>>>> the
>>>>>>>>> flatten transform in this manner.
>>>>>>>>>
>>>>>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>>>>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>>>>>>>
>>>>>>>>> kvPairs = 
>>>>>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>>>>>>>>> Input", Flatten.pCollections());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Sahil
>>>>>>>>>
>>>>>>>>>

Reply via email to