Hi folks,

Wanted to give a heads up about the existence of a commonly requested
feature and its first successful production usage.

The feature is the Wait.on() transform [1] , and the first successful
production usage is in Spanner [2] .

The Wait.on() transform allows you to "do this, then that" - in the sense
that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the
PCollection "signal" is "done" in the same window (i.e. when no more
elements can arrive into the same window of "signal"). The PCollection
"signal" is typically a collection of results of some operation - so
Wait.on(signal) allows you to wait until that operation is done. It
transparently works correctly in streaming pipelines too.

This may sound a little convoluted, so the example from documentation
should help.

PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first
database...));
data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no
earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

This is indeed what Spanner folks have done, and AFAIK they intend this for
importing multiple dependent database tables - e.g. first import a parent
table; when it's done, import the child table - all within one pipeline.
You can see example code in the tests [3].

Please note that this kind of stuff requires support from the IO connector
- IO.write() has to return a result that can be waited on. The code of
SpannerIO is a great example; another example is FileIO.write().

People have expressed wishes for similar support in Bigtable and BigQuery
connectors but it's not there yet. It would be really cool if somebody
added it to these connectors or others (I think there was a recent thread
discussing how to add it to BigQueryIO).

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
[2] https://github.com/apache/beam/pull/4264
[3]
https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158

Reply via email to