tianz101 commented on code in PR #36667:
URL: https://github.com/apache/beam/pull/36667#discussion_r2479444689


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -138,4 +166,76 @@ public ChangeStreamResultSet changeStreamQuery(
   private boolean isPostgres() {
     return this.dialect == Dialect.POSTGRESQL;
   }
+
+  // Returns the PartitionMode, fetching from Spanner on first call and 
caching.
+  protected PartitionMode getPartitionMode() {
+    if (this.partitionMode != PartitionMode.UNKNOWN) {
+      return this.partitionMode;
+    }
+    synchronized (this) {
+      if (this.partitionMode == PartitionMode.UNKNOWN) {
+        String fetchedPartitionMode =
+            fetchPartitionMode(this.databaseClient, this.dialect, 
this.changeStreamName);
+        if (fetchedPartitionMode.isEmpty()
+            || fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
+          this.partitionMode = PartitionMode.IMMUTABLE_KEY_RANGE;
+        } else {
+          this.partitionMode = PartitionMode.MUTABLE_KEY_RANGE;
+        }
+      }
+    }
+    return this.partitionMode;
+  }
+
+  // Convenience boolean method kept for compatibility
+  protected boolean isMutableKeyRangeChangeStream() {
+    return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
+  }
+
+  // Returns the partition_mode option value for the given change stream.
+  private static String fetchPartitionMode(
+      DatabaseClient databaseClient, Dialect dialect, String changeStreamName) 
{
+    try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
+      Statement statement;
+      if (dialect == Dialect.POSTGRESQL) {
+        statement =
+            Statement.newBuilder(
+                    "select option_name, option_value\n"
+                        + "from information_schema.change_stream_options\n"
+                        + "where change_stream_name = $1")

Review Comment:
   Can we add option_name = 'partition_mode' here so we can get the parititon 
mode directly and no need to filter later?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -91,8 +107,20 @@ public ChangeStreamResultSet changeStreamQuery(
     String query = "";
     Statement statement;
     if (this.isPostgres()) {
-      query =
-          "SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + 
"\"($1, $2, $3, $4, null)";
+      // Ensure we have determined whether change stream uses mutable key range
+      boolean isMutable = isMutableKeyRangeChangeStream();

Review Comment:
   Is this called each time we issue the query? As our query will be constantly 
checkpointed and canceled and re-issued, this can be called many times.
   
   I think we just need to query the partition mode once for the whole pipeline.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -138,4 +166,76 @@ public ChangeStreamResultSet changeStreamQuery(
   private boolean isPostgres() {
     return this.dialect == Dialect.POSTGRESQL;
   }
+
+  // Returns the PartitionMode, fetching from Spanner on first call and 
caching.
+  protected PartitionMode getPartitionMode() {
+    if (this.partitionMode != PartitionMode.UNKNOWN) {
+      return this.partitionMode;
+    }
+    synchronized (this) {
+      if (this.partitionMode == PartitionMode.UNKNOWN) {
+        String fetchedPartitionMode =
+            fetchPartitionMode(this.databaseClient, this.dialect, 
this.changeStreamName);
+        if (fetchedPartitionMode.isEmpty()
+            || fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
+          this.partitionMode = PartitionMode.IMMUTABLE_KEY_RANGE;
+        } else {
+          this.partitionMode = PartitionMode.MUTABLE_KEY_RANGE;
+        }
+      }
+    }
+    return this.partitionMode;
+  }
+
+  // Convenience boolean method kept for compatibility
+  protected boolean isMutableKeyRangeChangeStream() {
+    return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
+  }
+
+  // Returns the partition_mode option value for the given change stream.
+  private static String fetchPartitionMode(
+      DatabaseClient databaseClient, Dialect dialect, String changeStreamName) 
{
+    try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
+      Statement statement;
+      if (dialect == Dialect.POSTGRESQL) {
+        statement =
+            Statement.newBuilder(
+                    "select option_name, option_value\n"
+                        + "from information_schema.change_stream_options\n"
+                        + "where change_stream_name = $1")
+                .bind("p1")
+                .to(changeStreamName)
+                .build();
+      } else {
+        statement =
+            Statement.newBuilder(
+                    "select option_name, option_value\n"
+                        + "from information_schema.change_stream_options\n"
+                        + "where change_stream_name = @changeStreamName")

Review Comment:
   same comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to