Yeah, the entire input is not always what is needed, and can generally be
achieved via

    input -> wait(side input of write) -> do something with the input

Of course one could also do

entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
CombineGlobally(TrivialCombineFn)

to reduce this to a more minimal set with at least one element per Window.

The file writing operations emit the actual files that were written, which
can be handy. My suggestion of PCollection<?> was just so that we can emit
something usable, and decide exactly what is the most useful is later.


On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:

> I believe that the Wait transform turns this output into a side input, so
> outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn<T> I see a big problem:
>>
>>     @Setup
>>     public void setup() {
>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>     }
>>
>>     @ProcessElement
>>       public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>       writer.mutate(c.element());
>>     }
>>
>>     @Teardown
>>     public void teardown() throws Exception {
>>       writer.close();
>>       writer = null;
>>     }
>>
>> It is only in writer.close() that all async writes are waited on. This
>> needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the element as
>> "has been written". And you cannot output from the @TearDown method which
>> is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com> wrote:
>>
>>> How about a PCollection containing every element which was successfully
>>> written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully written
>>> to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>>
>>>>> +dev
>>>>>
>>>>> Since we all agree that we should return something different than
>>>>> PDone the real question is what should we return.
>>>>>
>>>>
>>>> My proposal is that one returns a PCollection<?> that consists,
>>>> internally, of something contentless like nulls. This is future compatible
>>>> with returning something more maningful based on the source source or write
>>>> process itself, but at least this would be followable.
>>>>
>>>>
>>>>> As a reminder we had a pretty interesting discussion about this
>>>>> already in the past but uniformization of our return values has not
>>>>> happened.
>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>> contribute Write transforms that return.
>>>>>
>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>
>>>>
>>>> Yeah, we should go ahead and finally do something.
>>>>
>>>>
>>>>>
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible.
>>>>>
>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>> the previous signature in short time (2 releases was the average for
>>>>> previous cases).
>>>>>
>>>>>
>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>> <aromanenko....@gmail.com> wrote:
>>>>> >
>>>>> > I thought that was said about returning a PCollection of write
>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>> > In this case, we keep backwards compatibility and just add new
>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>> "readAll()” methods).
>>>>> >
>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>> >
>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>> >
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>> returning variants (probably as another option to the builders) that
>>>>> compose well with Wait, etc. would be welcome.
>>>>> >
>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>> aromanenko....@gmail.com> wrote:
>>>>> >>
>>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>>> as a signal.
>>>>> >>
>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>> great to contribute it back to Beam in the way as it was done for other 
>>>>> IOs
>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>> >>
>>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>>> with “Wait” because this pattern it's quite often required.
>>>>> >>
>>>>> >> [1]
>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>> >> [2]
>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>> >>
>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>> vincent.marq...@gmail.com> wrote:
>>>>> >>
>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>> successfully written to a database.  So "record by record" is fine, or 
>>>>> even
>>>>> "bundle".
>>>>> >>
>>>>> >> ~Vincent
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>> aromanenko....@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>>> then write all successfully written records to PubSub or it should be
>>>>> performed "record by record"?
>>>>> >>>
>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>> vincent.marq...@gmail.com> wrote:
>>>>> >>>
>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>> PubSubIO.write
>>>>> >>>
>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>> >>>
>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>> >>>                                                          |
>>>>> >>>                                                           ->
>>>>> PubsubIO.write
>>>>> >>>
>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>> successfully finished a (quorum) write.
>>>>> >>>
>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>>> recommended way of doing this?
>>>>> >>>
>>>>> >>> Thanks in advance for any help.
>>>>> >>>
>>>>> >>> ~Vincent
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>

Reply via email to