[
https://issues.apache.org/jira/browse/BEAM-13298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484078#comment-17484078
]
Ranjan Dahal edited comment on BEAM-13298 at 1/29/22, 5:57 AM:
---------------------------------------------------------------
Happy to review the PR.
I would like to understand the use case for returning an output collection from
a Write transform. In general, Write transform doesn't modify the PCollection.
I would imagine we are just looking to return the same PCollection that is
passed to the Write transform. I do see the value of returning an output in
some cases where read/transformation/write happens at the same chaining of
transforms. However, I also think, we should have an option to return void
because in some cases, user may not care about the output. The JdbcIO has both
implementation that allows flexibility based on the use case the user has.
To better understand this, see below code snippet with comments:
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read())
;//Initial Read transform
PCollection<T> collectionAfterFirstWrite =
readCollection.apply(KafkaIO.WriteRecordsWithOutput()); //Aren't both
readCollection and collectionAfterFirstWrite are the same? These two
collections will increase memory footprint as PCollection is immutable
readCollection.apply(Wait.on(collectionAfterFirstWrite))
// Windows of this intermediate PCollection will be processed no earlier
than when
// the respective window of collectionAfterFirstWrite closes.
.apply(ParDo.of(...write to other destination...));
{code}
My initial thought process behind getting the PCollection<Void> was to be able
to wait on the prior transform as such.
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read())
;//Initial Read transform
PCollection<Void> collectionAfterWrite = readCollection
.apply(KafkaIO.WriteRecordsWithOutput()); //finalCollection and
collectionAfterWrite are not the same. The Void PCollection memory footprint
should be negligible compared to PCollection<T>
readCollection.apply(Wait.on(collectionAfterFirstWrite)) // Windows of this
intermediate PCollection will be processed no earlier than when // the
respective window of collectionAfterFirstWrite closes. .apply(ParDo.of(...write
to other destination...));{code}
With all that said, it would be ideal to get WriteRecordsWithOutput that
returns an output collection and WriteRecordsWithOutput.withVoidResults() that
returns PCollection<Void>.
In addition, it would be nice to be consistent across multiple IO's to have
same class or method *names* for similar kind of behaviors.
was (Author: rdahal):
Happy to review the PR.
I would like to understand the use case for returning an output collection from
a Write transform. In general, Write transform doesn't modify the PCollection.
I would imagine we are just looking to return the same PCollection that is
passed to the Write transform. I do see the value of returning an output in
some cases where read/transformation/write happens at the same chaining of
transforms. However, I also think, we should have an option to return void
because in some cases, user may not care about the output. The JdbcIO has both
implementation that allows flexibility based on the use case the user has.
To better understand this, see below code snippet with comments:
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read())
;//Initial Read transform
PCollection<T> collectionAfterFirstWrite =
readCollection.apply(KafkaIO.WriteRecordsWithOutput()); //Aren't both
readCollection and collectionAfterFirstWrite are the same? These two
collections will increase memory footprint as PCollection is immutable
readCollection.apply(Wait.on(collectionAfterFirstWrite))
// Windows of this intermediate PCollection will be processed no earlier
than when
// the respective window of collectionAfterFirstWrite closes.
.apply(ParDo.of(...write to other destination...));
{code}
My initial thought process behind getting the PCollection<void> was to be able
to wait on the prior transform as such.
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read())
;//Initial Read transform
PCollection<void> collectionAfterWrite = readCollection
.apply(KafkaIO.WriteRecordsWithOutput()); //finalCollection and
collectionAfterWrite are not the same. The void PCollection memory footprint
should be negligible compared to PCollection<T>
readCollection.apply(Wait.on(collectionAfterFirstWrite)) // Windows of this
intermediate PCollection will be processed no earlier than when // the
respective window of collectionAfterFirstWrite closes. .apply(ParDo.of(...write
to other destination...));{code}
With all that said, it would be ideal to get WriteRecordsWithOutput that
returns an output collection and WriteRecordsWithOutput.withVoidResults() that
returns PCollection<void>.
In addition, it would be nice to be consistent across multiple IO's to have
same class or method *names* for similar kind of behaviors.
> Add "withResults()" for KafkaIO.write
> -------------------------------------
>
> Key: BEAM-13298
> URL: https://issues.apache.org/jira/browse/BEAM-13298
> Project: Beam
> Issue Type: Sub-task
> Components: io-ideas, io-java-kafka
> Environment: Production
> Reporter: Ranjan Dahal
> Assignee: Wei Fong Hsia
> Priority: P2
> Labels: KafkaIO
> Original Estimate: 4h
> Remaining Estimate: 4h
>
> I am looking at use case where we have to wait until the Kafka Write
> operation is completed before we can move forward with another transform.
> Currently, JdbcIO support withResults() which waits for the previous
> transform to complete as part of Wait.on(Signal) and moves on to the next.
> Similarly, it would be very beneficial to have this capability on KafkaIO
> (and others like PubSubIO, BigQueryIO etc).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)