This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cb4efc7  [BEAM-12164]: display the metadata table's name on UI
     new e1a84ff  Merge pull request #16845 from [BEAM-12164]: display the 
metadata table name for Spanner Change Streams Connector
cb4efc7 is described below

commit cb4efc71328181d93599b5cc33f03b6b5c2fd2db
Author: Hengfeng Li <[email protected]>
AuthorDate: Mon Feb 14 16:18:40 2022 +1100

    [BEAM-12164]: display the metadata table's name on UI
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java    | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 863d88a..94e8930 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -80,6 +80,7 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
@@ -1442,6 +1443,7 @@ public class SpannerIO {
       return toBuilder().setMetadataDatabase(metadataDatabase).build();
     }
 
+    /** Specifies the metadata table name. */
     public ReadChangeStream withMetadataTable(String metadataTable) {
       return toBuilder().setMetadataTable(metadataTable).build();
     }
@@ -1585,6 +1587,11 @@ public class SpannerIO {
             new PostProcessingMetricsDoFn(metrics);
 
         LOG.info("Partition metadata table that will be used is " + 
partitionMetadataTableName);
+        input
+            .getPipeline()
+            .getOptions()
+            .as(SpannerChangeStreamOptions.class)
+            .setMetadataTable(partitionMetadataTableName);
 
         return input
             .apply(Impulse.create())
@@ -1596,6 +1603,19 @@ public class SpannerIO {
     }
   }
 
+  /**
+   * Interface to display the name of the metadata table on Dataflow UI. This 
is only used for
+   * internal purpose. This should not be used to pass the name of the 
metadata table.
+   */
+  public interface SpannerChangeStreamOptions extends StreamingOptions {
+
+    /** Returns the name of the metadata table. */
+    String getMetadataTable();
+
+    /** Specifies the name of the metadata table. */
+    void setMetadataTable(String table);
+  }
+
   private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> 
{
     @ProcessElement
     public void processElement(ProcessContext c) {

Reply via email to