On Tue, Apr 1, 2025 at 12:01 PM Jonathan Hope
<jonathan.douglas.h...@gmail.com> wrote:
>
> Unfortunately both databases will be online during this so conflicts could 
> occur in either direction. I had previously dug up an answer around modifying 
> the JdbcIO here: 
> https://stackoverflow.com/questions/56398422/exception-handling-in-apache-beam-pipelines-when-writing-to-database-using-java.
>  But I just wanted to check that there wasn't a more "official" approach that 
> I just hadn't come across yet.

That is great. Anyone want to volunteer to move this into Beam itself?

> On Tue, Apr 1, 2025 at 11:53 AM Robert Bradshaw via user 
> <u...@beam.apache.org> wrote:
>>
>> Good question. I think it depends on who else is modifying the SQL database.
>>
>> In the easy case (e.g. everything you want to write to your SQL
>> database comes from the NoSQL source) you could group (e.g. via a
>> GroupByKey) on your identifier, filter out duplicates with a
>> subsequent DoFn, and then write to your SQL database with the
>> assurance that there will be no duplicate keys.
>>
>> If you are concerned that there might be already existing, conflicting
>> entries in the SQL database, you can also add a step to check to see
>> if the identifier is already inserted (and, following the previous
>> step, wouldn't have to worry about another worker from this job
>> inserting the same identifier).
>>
>> If there are external, concurrent processes also modifying the SQL
>> database that you have to worry about, that gets a bit trickier, and
>> it seems necessary to do the dead letter queue as part of the write
>> itself. This may require modifying JdbcIO itself to return a dead
>> letter queue.
>>
>> - Robert
>>
>>
>>
>> On Tue, Apr 1, 2025 at 11:12 AM Jonathan Hope
>> <jonathan.douglas.h...@gmail.com> wrote:
>> >
>> > Hello, I had a question and was hoping this was the right place to ask.
>> >
>> > Let's say I'm moving data from a NoSQL database to a SQL database. Here is 
>> > an example document in the NoSQL database:
>> >
>> > {
>> >  "id": "1234",
>> >   "identifier": "5678"
>> > }
>> >
>> > The id is system generated, and the identifier is user provided. This is 
>> > being moved into a SQL database with two columns:
>> >
>> > id
>> > identifier
>> >
>> > In the SQL database there is a UNIQUE index on identifier, however the 
>> > same thing cannot be enforced on the NoSQL side. Now I could check for 
>> > this like so:
>> >
>> > Get source data
>> > Check to see if identifier has already been inserted
>> > Move duplicates to a dead letter queue
>> > Write the data
>> > Success
>> >
>> > But what could happen is:
>> >
>> > Get source data
>> > Check to see if identifier has already been inserted
>> > Move duplicates to a dead letter queue
>> > Another worker inserts a duplicate identifier
>> > Write the data
>> > Failure
>> >
>> >
>> > If I was doing this outside of the beam context I would try the write, 
>> > capture the errors, and then redirect the failures to some kind of dead 
>> > letter queue. However for the life of me I can't figure out how to do this 
>> > in Beam. In cases where writes are failing, retries will never succeed, 
>> > and you can't reliably check for the trigger of the failure ahead of time 
>> > what is the recommended pattern?
>> >
>> > Thanks!

Reply via email to