I second Alexey (and thanks Alexey ;)). I also started similar improvements in other IOs (PRs will come soon).
Regards JB On 17/04/2019 17:31, Alexey Romanenko wrote: > Hi Jonathan, > > I just wanted to let you know that this feature [1] was implemented and, > finally, merged into master. So, it should be included into next Beam > 2.13 release. > > In few words, it was added new method called “/Write.withResults()/” > which returns /WriteVoid/ transform that provides “/PCollection<Void>/” > as an output and can be used together with "/Wait.on()"/. So, the simple > example of writing into two different databases can look like this: > > /PCollection<Void> firstWriteResults = data.apply(JdbcIO.write() > .withDataSourceConfiguration(CONF_DB_1).withResults()); > data.apply(Wait.on(firstWriteResults)) > .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));/ > > [1] https://issues.apache.org/jira/browse/BEAM-6732 > >> On 22 Feb 2019, at 16:52, Alexey Romanenko <aromanenko....@gmail.com >> <mailto:aromanenko....@gmail.com>> wrote: >> >> I have created new Jira issue for this feature: >> https://issues.apache.org/jira/browse/BEAM-6732 >> >> Jonathan, feel free to assign it to yourself if you want to >> contribute, it is always welcomed =) >> >>> On 21 Feb 2019, at 10:23, Jonathan Perron >>> <jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>> wrote: >>> >>> Thank you Eugene for your answer. >>> >>> According to your explanation, I think I will go with your 3rd >>> solution, as this seems the most robust and friendly way to act. >>> >>> Jonathan >>> >>> On 21/02/2019 02:22, Eugene Kirpichov wrote: >>>> Hi Jonathan, >>>> >>>> Wait.on() requires a PCollection - it is not possible to change it >>>> to wait on PDone because all PDone's in the pipeline are the same so >>>> it's not clear what exactly you'd be waiting on. >>>> >>>> To use the Wait transform with JdbcIO.write(), you would need to >>>> change >>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762 >>>> to >>>> simply "return input.apply(ParDo.of(...))" and propagate that into >>>> the type signature. Then you'd get a waitable PCollection<Void>. >>>> >>>> This is a very simple, but backwards-incompatible change. Up to the >>>> Beam community whether/when people would want to make it. >>>> >>>> It's also possible to make a slightly larger but compatible change, >>>> where JdbcIO.write() would stay as is, but you could write e.g. >>>> "JdbcIO.write().withResults()" which would be a new transform that >>>> *does* return results and is waitable. A similar approach is taken >>>> in TextIO.write().withOutputFilenames(). >>>> >>>> On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron >>>> <jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>> >>>> wrote: >>>> >>>> Hello folks, >>>> >>>> I am meeting a special case where I need to wait for a >>>> JdbcIO.write() >>>> operation to be complete to start a second one. >>>> >>>> In the details, I have a PCollection<Map<String, String>> which >>>> is used >>>> to fill two different SQL statement. It is used in a first >>>> JdbcIO.write() operation to store anonymized user in a table >>>> (userId >>>> with an associated userUuid generated with UUID.randomUUID()). >>>> These two >>>> parameters have a unique constraint, meaning that a userId >>>> cannot have >>>> multiple userUuid. Unfortunately, on several runs of my >>>> pipeline, the >>>> UUID will be different, meaning that I need to query this table >>>> at some >>>> point, or to use what I describe in the following. >>>> >>>> I am planning to fill a second table with this userUuid with a >>>> couple of >>>> others information such as the time of first visit. To limit I/O >>>> and as >>>> I got a lot of information in my PCollection, I want to use it >>>> once more >>>> with a different SQL statement, where the userUuid is read from the >>>> first table using a SELECT statement. This cannot work if the first >>>> JdbcIO.write() operation is not complete. >>>> >>>> I saw that the Java SDK proposes a Wait.on() PTransform, but it is >>>> unfortunately only compatible with PCollection, and not a PDone >>>> such as >>>> the one output from the JdbcIO operation. Could my issue be >>>> solved by >>>> expanding the Wait.On() or should I go with an other solution ? >>>> If so, >>>> how could I implement it ? >>>> >>>> Many thanks for your input ! >>>> >>>> Jonathan >>>> >> > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com