[ 
https://issues.apache.org/jira/browse/BEAM-13298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484078#comment-17484078
 ] 

Ranjan Dahal edited comment on BEAM-13298 at 1/29/22, 5:57 AM:
---------------------------------------------------------------

Happy to review the PR. 

I would like to understand the use case for returning an output collection from 
a Write transform. In general, Write transform doesn't modify the PCollection. 
I would imagine we are just looking to return the same PCollection that is 
passed to the Write transform. I do see the value of returning an output in 
some cases where read/transformation/write happens at the same chaining of 
transforms. However, I also think, we should have an option to return void 
because in some cases, user may not care about the output. The JdbcIO has both 
implementation that allows flexibility based on the use case the user has. 

To better understand this, see below code snippet with comments:
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) 
;//Initial Read transform

PCollection<T> collectionAfterFirstWrite = 
readCollection.apply(KafkaIO.WriteRecordsWithOutput());  //Aren't both 
readCollection and collectionAfterFirstWrite are the same? These two 
collections will increase memory footprint as PCollection is immutable

readCollection.apply(Wait.on(collectionAfterFirstWrite))
     // Windows of this intermediate PCollection will be processed no earlier 
than when
     // the respective window of collectionAfterFirstWrite closes.
     .apply(ParDo.of(...write to other destination...));

{code}
My initial thought process behind getting the PCollection<Void> was to be able 
to wait on the prior transform as such.
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) 
;//Initial Read transform

PCollection<Void> collectionAfterWrite = readCollection 
.apply(KafkaIO.WriteRecordsWithOutput()); //finalCollection and 
collectionAfterWrite are not the same. The Void PCollection memory footprint 
should be negligible compared to PCollection<T>

readCollection.apply(Wait.on(collectionAfterFirstWrite)) // Windows of this 
intermediate PCollection will be processed no earlier than when // the 
respective window of collectionAfterFirstWrite closes. .apply(ParDo.of(...write 
to other destination...));{code}
 

With all that said, it would be ideal to get WriteRecordsWithOutput that 
returns an output collection and WriteRecordsWithOutput.withVoidResults() that 
returns PCollection<Void>. 

In addition, it would be nice to be consistent across multiple IO's to have 
same class or method *names* for similar kind of behaviors.  

 


was (Author: rdahal):
Happy to review the PR. 

I would like to understand the use case for returning an output collection from 
a Write transform. In general, Write transform doesn't modify the PCollection. 
I would imagine we are just looking to return the same PCollection that is 
passed to the Write transform. I do see the value of returning an output in 
some cases where read/transformation/write happens at the same chaining of 
transforms. However, I also think, we should have an option to return void 
because in some cases, user may not care about the output. The JdbcIO has both 
implementation that allows flexibility based on the use case the user has. 

To better understand this, see below code snippet with comments:
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) 
;//Initial Read transform

PCollection<T> collectionAfterFirstWrite = 
readCollection.apply(KafkaIO.WriteRecordsWithOutput());  //Aren't both 
readCollection and collectionAfterFirstWrite are the same? These two 
collections will increase memory footprint as PCollection is immutable

readCollection.apply(Wait.on(collectionAfterFirstWrite))
     // Windows of this intermediate PCollection will be processed no earlier 
than when
     // the respective window of collectionAfterFirstWrite closes.
     .apply(ParDo.of(...write to other destination...));

{code}
My initial thought process behind getting the PCollection<void> was to be able 
to wait on the prior transform as such.
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) 
;//Initial Read transform

PCollection<void> collectionAfterWrite = readCollection 
.apply(KafkaIO.WriteRecordsWithOutput()); //finalCollection and 
collectionAfterWrite are not the same. The void PCollection memory footprint 
should be negligible compared to PCollection<T>

readCollection.apply(Wait.on(collectionAfterFirstWrite)) // Windows of this 
intermediate PCollection will be processed no earlier than when // the 
respective window of collectionAfterFirstWrite closes. .apply(ParDo.of(...write 
to other destination...));{code}
 

With all that said, it would be ideal to get WriteRecordsWithOutput that 
returns an output collection and WriteRecordsWithOutput.withVoidResults() that 
returns PCollection<void>. 

In addition, it would be nice to be consistent across multiple IO's to have 
same class or method *names* for similar kind of behaviors.  

 

> Add "withResults()" for KafkaIO.write
> -------------------------------------
>
>                 Key: BEAM-13298
>                 URL: https://issues.apache.org/jira/browse/BEAM-13298
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-ideas, io-java-kafka
>         Environment: Production
>            Reporter: Ranjan Dahal
>            Assignee: Wei Fong Hsia
>            Priority: P2
>              Labels: KafkaIO
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> I am looking at use case where we have to wait until the Kafka Write 
> operation is completed before we can move forward with another transform. 
> Currently, JdbcIO support withResults() which waits for the previous 
> transform to complete as part of Wait.on(Signal) and moves on to the next. 
> Similarly, it would be very beneficial to have this capability on KafkaIO 
> (and others like PubSubIO, BigQueryIO etc). 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to