I second Alexey (and thanks Alexey ;)).

I also started similar improvements in other IOs (PRs will come soon).

Regards
JB

On 17/04/2019 17:31, Alexey Romanenko wrote:
> Hi Jonathan,
> 
> I just wanted to let you know that this feature [1] was implemented and,
> finally, merged into master. So, it should be included into next Beam
> 2.13 release.
> 
> In few words, it was added new method called “/Write.withResults()/”
> which returns /WriteVoid/ transform that provides “/PCollection<Void>/”
> as an output and can be used together with "/Wait.on()"/. So, the simple
> example of writing into two different databases can look like this:
> 
> /PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
>     .withDataSourceConfiguration(CONF_DB_1).withResults());
> data.apply(Wait.on(firstWriteResults))
>     .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));/
> 
> [1] https://issues.apache.org/jira/browse/BEAM-6732
> 
>> On 22 Feb 2019, at 16:52, Alexey Romanenko <aromanenko....@gmail.com
>> <mailto:aromanenko....@gmail.com>> wrote:
>>
>> I have created new Jira issue for this feature:
>> https://issues.apache.org/jira/browse/BEAM-6732
>>
>> Jonathan, feel free to assign it to yourself if you want to
>> contribute, it is always welcomed =)
>>
>>> On 21 Feb 2019, at 10:23, Jonathan Perron
>>> <jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>> wrote:
>>>
>>> Thank you Eugene for your answer.
>>>
>>> According to your explanation, I think I will go with your 3rd
>>> solution, as this seems the most robust and friendly way to act.
>>>
>>> Jonathan
>>>
>>> On 21/02/2019 02:22, Eugene Kirpichov wrote:
>>>> Hi Jonathan,
>>>>
>>>> Wait.on() requires a PCollection - it is not possible to change it
>>>> to wait on PDone because all PDone's in the pipeline are the same so
>>>> it's not clear what exactly you'd be waiting on.
>>>>
>>>> To use the Wait transform with JdbcIO.write(), you would need to
>>>> change 
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762
>>>>  to
>>>> simply "return input.apply(ParDo.of(...))" and propagate that into
>>>> the type signature. Then you'd get a waitable PCollection<Void>.
>>>>
>>>> This is a very simple, but backwards-incompatible change. Up to the
>>>> Beam community whether/when people would want to make it.
>>>>
>>>> It's also possible to make a slightly larger but compatible change,
>>>> where JdbcIO.write() would stay as is, but you could write e.g.
>>>> "JdbcIO.write().withResults()" which would be a new transform that
>>>> *does* return results and is waitable. A similar approach is taken
>>>> in TextIO.write().withOutputFilenames().
>>>>
>>>> On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron
>>>> <jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>>
>>>> wrote:
>>>>
>>>>     Hello folks,
>>>>
>>>>     I am meeting a special case where I need to wait for a
>>>>     JdbcIO.write()
>>>>     operation to be complete to start a second one.
>>>>
>>>>     In the details, I have a PCollection<Map<String, String>> which
>>>>     is used
>>>>     to fill two different SQL statement. It is used in a first
>>>>     JdbcIO.write() operation to store anonymized user in a table
>>>>     (userId
>>>>     with an associated userUuid generated with UUID.randomUUID()).
>>>>     These two
>>>>     parameters have a unique constraint, meaning that a userId
>>>>     cannot have
>>>>     multiple userUuid. Unfortunately, on several runs of my
>>>>     pipeline, the
>>>>     UUID will be different, meaning that I need to query this table
>>>>     at some
>>>>     point, or to use what I describe in the following.
>>>>
>>>>     I am planning to fill a second table with this userUuid with a
>>>>     couple of
>>>>     others information such as the time of first visit. To limit I/O
>>>>     and as
>>>>     I got a lot of information in my PCollection, I want to use it
>>>>     once more
>>>>     with a different SQL statement, where the userUuid is read from the
>>>>     first table using a SELECT statement. This cannot work if the first
>>>>     JdbcIO.write() operation is not complete.
>>>>
>>>>     I saw that the Java SDK proposes a Wait.on() PTransform, but it is
>>>>     unfortunately only compatible with PCollection, and not a PDone
>>>>     such as
>>>>     the one output from the JdbcIO operation. Could my issue be
>>>>     solved by
>>>>     expanding the Wait.On() or should I go with an other solution ?
>>>>     If so,
>>>>     how could I implement it ?
>>>>
>>>>     Many thanks for your input !
>>>>
>>>>     Jonathan
>>>>
>>
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to