On Tue, Apr 1, 2025 at 12:01 PM Jonathan Hope <jonathan.douglas.h...@gmail.com> wrote: > > Unfortunately both databases will be online during this so conflicts could > occur in either direction. I had previously dug up an answer around modifying > the JdbcIO here: > https://stackoverflow.com/questions/56398422/exception-handling-in-apache-beam-pipelines-when-writing-to-database-using-java. > But I just wanted to check that there wasn't a more "official" approach that > I just hadn't come across yet.
That is great. Anyone want to volunteer to move this into Beam itself? > On Tue, Apr 1, 2025 at 11:53 AM Robert Bradshaw via user > <u...@beam.apache.org> wrote: >> >> Good question. I think it depends on who else is modifying the SQL database. >> >> In the easy case (e.g. everything you want to write to your SQL >> database comes from the NoSQL source) you could group (e.g. via a >> GroupByKey) on your identifier, filter out duplicates with a >> subsequent DoFn, and then write to your SQL database with the >> assurance that there will be no duplicate keys. >> >> If you are concerned that there might be already existing, conflicting >> entries in the SQL database, you can also add a step to check to see >> if the identifier is already inserted (and, following the previous >> step, wouldn't have to worry about another worker from this job >> inserting the same identifier). >> >> If there are external, concurrent processes also modifying the SQL >> database that you have to worry about, that gets a bit trickier, and >> it seems necessary to do the dead letter queue as part of the write >> itself. This may require modifying JdbcIO itself to return a dead >> letter queue. >> >> - Robert >> >> >> >> On Tue, Apr 1, 2025 at 11:12 AM Jonathan Hope >> <jonathan.douglas.h...@gmail.com> wrote: >> > >> > Hello, I had a question and was hoping this was the right place to ask. >> > >> > Let's say I'm moving data from a NoSQL database to a SQL database. Here is >> > an example document in the NoSQL database: >> > >> > { >> > "id": "1234", >> > "identifier": "5678" >> > } >> > >> > The id is system generated, and the identifier is user provided. This is >> > being moved into a SQL database with two columns: >> > >> > id >> > identifier >> > >> > In the SQL database there is a UNIQUE index on identifier, however the >> > same thing cannot be enforced on the NoSQL side. Now I could check for >> > this like so: >> > >> > Get source data >> > Check to see if identifier has already been inserted >> > Move duplicates to a dead letter queue >> > Write the data >> > Success >> > >> > But what could happen is: >> > >> > Get source data >> > Check to see if identifier has already been inserted >> > Move duplicates to a dead letter queue >> > Another worker inserts a duplicate identifier >> > Write the data >> > Failure >> > >> > >> > If I was doing this outside of the beam context I would try the write, >> > capture the errors, and then redirect the failures to some kind of dead >> > letter queue. However for the life of me I can't figure out how to do this >> > in Beam. In cases where writes are failing, retries will never succeed, >> > and you can't reliably check for the trigger of the failure ahead of time >> > what is the recommended pattern? >> > >> > Thanks!