[ 
https://issues.apache.org/jira/browse/BEAM-13584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Romanenko updated BEAM-13584:
------------------------------------
    Description: 
Currently, by default, a "Write" transform is considered as a final 
{{PTransform}} in a pipeline and it returns {{PDone}} that can't be used later 
for next pipeline stages. 

Though, sometimes it's needed to use the results of write operation as input 
{{PCollection}} downstream of pipeline and some IOs already support this. For 
example, {{JdbcIO.Write}} has the methods {{withResults()}} and 
{{withWriteResults()}} that returns a {{PTransform}} with output 
{{PCollection<>}} of write results and it can be used to analyse the results 
or, along with {{Wait}} transform, to write into the second database only if 
writes into the first one were finished, like this:

{code}
PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
     .withDataSourceConfiguration(CONF_DB_1).withResults());
data.apply(Wait.on(firstWriteResults))
     .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
{code}

> Add "Write.withResults()" for all Java SDK IOs
> ----------------------------------------------
>
>                 Key: BEAM-13584
>                 URL: https://issues.apache.org/jira/browse/BEAM-13584
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-ideas
>            Reporter: Alexey Romanenko
>            Priority: P2
>
> Currently, by default, a "Write" transform is considered as a final 
> {{PTransform}} in a pipeline and it returns {{PDone}} that can't be used 
> later for next pipeline stages. 
> Though, sometimes it's needed to use the results of write operation as input 
> {{PCollection}} downstream of pipeline and some IOs already support this. For 
> example, {{JdbcIO.Write}} has the methods {{withResults()}} and 
> {{withWriteResults()}} that returns a {{PTransform}} with output 
> {{PCollection<>}} of write results and it can be used to analyse the results 
> or, along with {{Wait}} transform, to write into the second database only if 
> writes into the first one were finished, like this:
> {code}
> PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
>      .withDataSourceConfiguration(CONF_DB_1).withResults());
> data.apply(Wait.on(firstWriteResults))
>      .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to