[ 
https://issues.apache.org/jira/browse/BEAM-13298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483576#comment-17483576
 ] 

Ranjan Dahal commented on BEAM-13298:
-------------------------------------

[~kenn] Apologies for missing your comment earlier. I will try to make some 
contributions on other tickets if time permits. 

[~weifonghsia] We can get some clues from JdbcIO writeResults() that returns a 
PCollection<void> instead of PDone. 
[https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1647]

[https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1773]
 

I have a working version of my IO transform in my codebase where instead of 
returning PDone or PCollection<T>, I am outputting nulls once things are 
processed on processElement. 
{code:java}
// code placeholder
public void processElement(ProcessContext ctx) throws Exception {  
    //Write to Kafka/PubSub/BQ etc

    //We still want to publish void collection
    ctx.output(null);
} {code}
Hope this is helpful. Interested in seeing the PR when you submit. 

> Add "withResults()" for KafkaIO.write
> -------------------------------------
>
>                 Key: BEAM-13298
>                 URL: https://issues.apache.org/jira/browse/BEAM-13298
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-ideas, io-java-kafka
>         Environment: Production
>            Reporter: Ranjan Dahal
>            Assignee: Alexey Romanenko
>            Priority: P2
>              Labels: KafkaIO
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> I am looking at use case where we have to wait until the Kafka Write 
> operation is completed before we can move forward with another transform. 
> Currently, JdbcIO support withResults() which waits for the previous 
> transform to complete as part of Wait.on(Signal) and moves on to the next. 
> Similarly, it would be very beneficial to have this capability on KafkaIO 
> (and others like PubSubIO, BigQueryIO etc). 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to