How about a PCollection containing every element which was successfully
written?
Basically the same things which were passed into it.

Then you could act on every element after its been successfully written to
the sink.

On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com> wrote:

> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>
>> +dev
>>
>> Since we all agree that we should return something different than
>> PDone the real question is what should we return.
>>
>
> My proposal is that one returns a PCollection<?> that consists,
> internally, of something contentless like nulls. This is future compatible
> with returning something more maningful based on the source source or write
> process itself, but at least this would be followable.
>
>
>> As a reminder we had a pretty interesting discussion about this
>> already in the past but uniformization of our return values has not
>> happened.
>> This thread is worth reading for Vincent or anyone who wants to
>> contribute Write transforms that return.
>>
>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>
>
> Yeah, we should go ahead and finally do something.
>
>
>>
>> > Returning PDone is an anti-pattern that should be avoided, but changing
>> it now would be backwards incompatible.
>>
>> Periodic reminder most IOs are still Experimental so I suppose it is
>> worth to the maintainers to judge if the upgrade to return someething
>> different of PDone is worth, in that case we can deprecate and remove
>> the previous signature in short time (2 releases was the average for
>> previous cases).
>>
>>
>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>> <aromanenko....@gmail.com> wrote:
>> >
>> > I thought that was said about returning a PCollection of write results
>> as it’s done in other IOs (as I mentioned as examples) that have
>> _additional_ write methods, like “withWriteResults()” etc, that return
>> PTransform<…, PCollection<WriteResults>>.
>> > In this case, we keep backwards compatibility and just add new
>> funtionality. Though, we need to follow the same pattern for user API and
>> maybe even naming for this feature across different IOs (like we have for
>> "readAll()” methods).
>> >
>> >  I agree that we have to avoid returning PDone for such cases.
>> >
>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com> wrote:
>> >
>> > Returning PDone is an anti-pattern that should be avoided, but changing
>> it now would be backwards incompatible. PRs to add non-PDone returning
>> variants (probably as another option to the builders) that compose well
>> with Wait, etc. would be welcome.
>> >
>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>> >>
>> >> In this way, I think “Wait” PTransform should work for you but, as it
>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>> a signal.
>> >>
>> >> Since you already adjusted your own writer for that, it would be great
>> to contribute it back to Beam in the way as it was done for other IOs (for
>> example, JdbcIO [1] or BigtableIO [2])
>> >>
>> >> In general, I think we need to have it for all IOs, at least to use
>> with “Wait” because this pattern it's quite often required.
>> >>
>> >> [1]
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>> >> [2]
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>> >>
>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marq...@gmail.com>
>> wrote:
>> >>
>> >> No, it only needs to ensure that one record seen on Pubsub has
>> successfully written to a database.  So "record by record" is fine, or even
>> "bundle".
>> >>
>> >> ~Vincent
>> >>
>> >>
>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>> >>>
>> >>> Do you want to wait for ALL records are written for Cassandra and
>> then write all successfully written records to PubSub or it should be
>> performed "record by record"?
>> >>>
>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vincent.marq...@gmail.com>
>> wrote:
>> >>>
>> >>> I have a common use case where my pipeline looks like this:
>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>> PubSubIO.write
>> >>>
>> >>> I do NOT want my pipeline to look like the following:
>> >>>
>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>> >>>                                                          |
>> >>>                                                           ->
>> PubsubIO.write
>> >>>
>> >>> Because I need to ensure that only items written to Pubsub have
>> successfully finished a (quorum) write.
>> >>>
>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>> use it here so I often roll my own 'writer', but maybe there is a
>> recommended way of doing this?
>> >>>
>> >>> Thanks in advance for any help.
>> >>>
>> >>> ~Vincent
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to