This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cb4efc7 [BEAM-12164]: display the metadata table's name on UI
new e1a84ff Merge pull request #16845 from [BEAM-12164]: display the
metadata table name for Spanner Change Streams Connector
cb4efc7 is described below
commit cb4efc71328181d93599b5cc33f03b6b5c2fd2db
Author: Hengfeng Li <[email protected]>
AuthorDate: Mon Feb 14 16:18:40 2022 +1100
[BEAM-12164]: display the metadata table's name on UI
---
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 863d88a..94e8930 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -80,6 +80,7 @@ import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.Schema;
@@ -1442,6 +1443,7 @@ public class SpannerIO {
return toBuilder().setMetadataDatabase(metadataDatabase).build();
}
+ /** Specifies the metadata table name. */
public ReadChangeStream withMetadataTable(String metadataTable) {
return toBuilder().setMetadataTable(metadataTable).build();
}
@@ -1585,6 +1587,11 @@ public class SpannerIO {
new PostProcessingMetricsDoFn(metrics);
LOG.info("Partition metadata table that will be used is " +
partitionMetadataTableName);
+ input
+ .getPipeline()
+ .getOptions()
+ .as(SpannerChangeStreamOptions.class)
+ .setMetadataTable(partitionMetadataTableName);
return input
.apply(Impulse.create())
@@ -1596,6 +1603,19 @@ public class SpannerIO {
}
}
+ /**
+ * Interface to display the name of the metadata table on Dataflow UI. This
is only used for
+ * internal purpose. This should not be used to pass the name of the
metadata table.
+ */
+ public interface SpannerChangeStreamOptions extends StreamingOptions {
+
+ /** Returns the name of the metadata table. */
+ String getMetadataTable();
+
+ /** Specifies the name of the metadata table. */
+ void setMetadataTable(String table);
+ }
+
private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup>
{
@ProcessElement
public void processElement(ProcessContext c) {