Yeah, the entire input is not always what is needed, and can generally be achieved via
input -> wait(side input of write) -> do something with the input Of course one could also do entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) -> CombineGlobally(TrivialCombineFn) to reduce this to a more minimal set with at least one element per Window. The file writing operations emit the actual files that were written, which can be handy. My suggestion of PCollection<?> was just so that we can emit something usable, and decide exactly what is the most useful is later. On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote: > I believe that the Wait transform turns this output into a side input, so > outputting the input PCollection might be problematic. > > On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <k...@apache.org> wrote: > >> Alex's idea sounds good and like what Vincent maybe implemented. I am >> just reading really quickly so sorry if I missed something... >> >> Checking out the code for the WriteFn<T> I see a big problem: >> >> @Setup >> public void setup() { >> writer = new Mutator<>(spec, Mapper::saveAsync, "writes"); >> } >> >> @ProcessElement >> public void processElement(ProcessContext c) throws >> ExecutionException, InterruptedException { >> writer.mutate(c.element()); >> } >> >> @Teardown >> public void teardown() throws Exception { >> writer.close(); >> writer = null; >> } >> >> It is only in writer.close() that all async writes are waited on. This >> needs to happen in @FinishBundle. >> >> Did you discover this when implementing your own Cassandra.Write? >> >> Until you have waited on the future, you should not output the element as >> "has been written". And you cannot output from the @TearDown method which >> is just for cleaning up resources. >> >> Am I reading this wrong? >> >> Kenn >> >> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com> wrote: >> >>> How about a PCollection containing every element which was successfully >>> written? >>> Basically the same things which were passed into it. >>> >>> Then you could act on every element after its been successfully written >>> to the sink. >>> >>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com> wrote: >>>> >>>>> +dev >>>>> >>>>> Since we all agree that we should return something different than >>>>> PDone the real question is what should we return. >>>>> >>>> >>>> My proposal is that one returns a PCollection<?> that consists, >>>> internally, of something contentless like nulls. This is future compatible >>>> with returning something more maningful based on the source source or write >>>> process itself, but at least this would be followable. >>>> >>>> >>>>> As a reminder we had a pretty interesting discussion about this >>>>> already in the past but uniformization of our return values has not >>>>> happened. >>>>> This thread is worth reading for Vincent or anyone who wants to >>>>> contribute Write transforms that return. >>>>> >>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E >>>> >>>> >>>> Yeah, we should go ahead and finally do something. >>>> >>>> >>>>> >>>>> > Returning PDone is an anti-pattern that should be avoided, but >>>>> changing it now would be backwards incompatible. >>>>> >>>>> Periodic reminder most IOs are still Experimental so I suppose it is >>>>> worth to the maintainers to judge if the upgrade to return someething >>>>> different of PDone is worth, in that case we can deprecate and remove >>>>> the previous signature in short time (2 releases was the average for >>>>> previous cases). >>>>> >>>>> >>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko >>>>> <aromanenko....@gmail.com> wrote: >>>>> > >>>>> > I thought that was said about returning a PCollection of write >>>>> results as it’s done in other IOs (as I mentioned as examples) that have >>>>> _additional_ write methods, like “withWriteResults()” etc, that return >>>>> PTransform<…, PCollection<WriteResults>>. >>>>> > In this case, we keep backwards compatibility and just add new >>>>> funtionality. Though, we need to follow the same pattern for user API and >>>>> maybe even naming for this feature across different IOs (like we have for >>>>> "readAll()” methods). >>>>> > >>>>> > I agree that we have to avoid returning PDone for such cases. >>>>> > >>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> > >>>>> > Returning PDone is an anti-pattern that should be avoided, but >>>>> changing it now would be backwards incompatible. PRs to add non-PDone >>>>> returning variants (probably as another option to the builders) that >>>>> compose well with Wait, etc. would be welcome. >>>>> > >>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko < >>>>> aromanenko....@gmail.com> wrote: >>>>> >> >>>>> >> In this way, I think “Wait” PTransform should work for you but, as >>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection >>>>> as a signal. >>>>> >> >>>>> >> Since you already adjusted your own writer for that, it would be >>>>> great to contribute it back to Beam in the way as it was done for other >>>>> IOs >>>>> (for example, JdbcIO [1] or BigtableIO [2]) >>>>> >> >>>>> >> In general, I think we need to have it for all IOs, at least to use >>>>> with “Wait” because this pattern it's quite often required. >>>>> >> >>>>> >> [1] >>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078 >>>>> >> [2] >>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715 >>>>> >> >>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez < >>>>> vincent.marq...@gmail.com> wrote: >>>>> >> >>>>> >> No, it only needs to ensure that one record seen on Pubsub has >>>>> successfully written to a database. So "record by record" is fine, or >>>>> even >>>>> "bundle". >>>>> >> >>>>> >> ~Vincent >>>>> >> >>>>> >> >>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko < >>>>> aromanenko....@gmail.com> wrote: >>>>> >>> >>>>> >>> Do you want to wait for ALL records are written for Cassandra and >>>>> then write all successfully written records to PubSub or it should be >>>>> performed "record by record"? >>>>> >>> >>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez < >>>>> vincent.marq...@gmail.com> wrote: >>>>> >>> >>>>> >>> I have a common use case where my pipeline looks like this: >>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> >>>>> PubSubIO.write >>>>> >>> >>>>> >>> I do NOT want my pipeline to look like the following: >>>>> >>> >>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write >>>>> >>> | >>>>> >>> -> >>>>> PubsubIO.write >>>>> >>> >>>>> >>> Because I need to ensure that only items written to Pubsub have >>>>> successfully finished a (quorum) write. >>>>> >>> >>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually >>>>> use it here so I often roll my own 'writer', but maybe there is a >>>>> recommended way of doing this? >>>>> >>> >>>>> >>> Thanks in advance for any help. >>>>> >>> >>>>> >>> ~Vincent >>>>> >>> >>>>> >>> >>>>> >> >>>>> > >>>>> >>>>