This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4a7a5d73e feat: optimize Spanner changestream metadata table (#32213)
6b4a7a5d73e is described below

commit 6b4a7a5d73eacb92fe1b225679c9aea5b89cdc44
Author: Thiago Nunes <[email protected]>
AuthorDate: Tue Aug 20 02:26:13 2024 +1000

    feat: optimize Spanner changestream metadata table (#32213)
    
    * feat: optimize Spanner changestream metadata table
    
    * fix: linting
    
    * tests: fixes admin dao tests
---
 .../dao/PartitionMetadataAdminDao.java             | 83 ++++++++++++++++++----
 .../dao/PartitionMetadataAdminDaoTest.java         | 29 +++++---
 2 files changed, 89 insertions(+), 23 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
index 8f0951d4ac5..368cab7022b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
@@ -23,7 +23,8 @@ import com.google.cloud.spanner.Dialect;
 import com.google.cloud.spanner.SpannerException;
 import com.google.cloud.spanner.SpannerExceptionFactory;
 import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -78,6 +79,12 @@ public class PartitionMetadataAdminDao {
    */
   public static final String COLUMN_FINISHED_AT = "FinishedAt";
 
+  /** Metadata table index for queries over the watermark column. */
+  public static final String WATERMARK_INDEX = "WatermarkIndex";
+
+  /** Metadata table index for queries over the created at / start timestamp 
columns. */
+  public static final String CREATED_AT_START_TIMESTAMP_INDEX = 
"CreatedAtStartTimestampIndex";
+
   private static final int TIMEOUT_MINUTES = 10;
   private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;
 
@@ -117,10 +124,10 @@ public class PartitionMetadataAdminDao {
    * PartitionMetadataAdminDao#TTL_AFTER_PARTITION_FINISHED_DAYS} days.
    */
   public void createPartitionMetadataTable() {
-    String metadataCreateStmt = "";
+    List<String> ddl = new ArrayList<>();
     if (this.isPostgres()) {
       // Literals need be added around literals to preserve casing.
-      metadataCreateStmt =
+      ddl.add(
           "CREATE TABLE \""
               + tableName
               + "\"(\""
@@ -146,15 +153,37 @@ public class PartitionMetadataAdminDao {
               + "\" SPANNER.COMMIT_TIMESTAMP,\""
               + COLUMN_FINISHED_AT
               + "\" SPANNER.COMMIT_TIMESTAMP,"
-              + " PRIMARY KEY (\"PartitionToken\")"
+              + " PRIMARY KEY (\""
+              + COLUMN_PARTITION_TOKEN
+              + "\")"
               + ")"
               + " TTL INTERVAL '"
               + TTL_AFTER_PARTITION_FINISHED_DAYS
               + " days' ON \""
               + COLUMN_FINISHED_AT
-              + "\"";
+              + "\"");
+      ddl.add(
+          "CREATE INDEX \""
+              + WATERMARK_INDEX
+              + "\" on \""
+              + tableName
+              + "\" (\""
+              + COLUMN_WATERMARK
+              + "\") INCLUDE (\""
+              + COLUMN_STATE
+              + "\")");
+      ddl.add(
+          "CREATE INDEX \""
+              + CREATED_AT_START_TIMESTAMP_INDEX
+              + "\" ON \""
+              + tableName
+              + "\" (\""
+              + COLUMN_CREATED_AT
+              + "\",\""
+              + COLUMN_START_TIMESTAMP
+              + "\")");
     } else {
-      metadataCreateStmt =
+      ddl.add(
           "CREATE TABLE "
               + tableName
               + " ("
@@ -180,16 +209,37 @@ public class PartitionMetadataAdminDao {
               + " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
               + COLUMN_FINISHED_AT
               + " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
-              + ") PRIMARY KEY (PartitionToken),"
+              + ") PRIMARY KEY ("
+              + COLUMN_PARTITION_TOKEN
+              + "),"
               + " ROW DELETION POLICY (OLDER_THAN("
               + COLUMN_FINISHED_AT
               + ", INTERVAL "
               + TTL_AFTER_PARTITION_FINISHED_DAYS
-              + " DAY))";
+              + " DAY))");
+      ddl.add(
+          "CREATE INDEX "
+              + WATERMARK_INDEX
+              + " on "
+              + tableName
+              + " ("
+              + COLUMN_WATERMARK
+              + ") STORING ("
+              + COLUMN_STATE
+              + ")");
+      ddl.add(
+          "CREATE INDEX "
+              + CREATED_AT_START_TIMESTAMP_INDEX
+              + " ON "
+              + tableName
+              + " ("
+              + COLUMN_CREATED_AT
+              + ","
+              + COLUMN_START_TIMESTAMP
+              + ")");
     }
     OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
-        databaseAdminClient.updateDatabaseDdl(
-            instanceId, databaseId, 
Collections.singletonList(metadataCreateStmt), null);
+        databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, 
null);
     try {
       // Initiate the request which returns an OperationFuture.
       op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
@@ -212,15 +262,18 @@ public class PartitionMetadataAdminDao {
    * PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
    */
   public void deletePartitionMetadataTable() {
-    String metadataDropStmt;
+    List<String> ddl = new ArrayList<>();
     if (this.isPostgres()) {
-      metadataDropStmt = "DROP TABLE \"" + tableName + "\"";
+      ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
+      ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
+      ddl.add("DROP TABLE \"" + tableName + "\"");
     } else {
-      metadataDropStmt = "DROP TABLE " + tableName;
+      ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
+      ddl.add("DROP INDEX " + WATERMARK_INDEX);
+      ddl.add("DROP TABLE " + tableName);
     }
     OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
-        databaseAdminClient.updateDatabaseDdl(
-            instanceId, databaseId, 
Collections.singletonList(metadataDropStmt), null);
+        databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, 
null);
     try {
       // Initiate the request which returns an OperationFuture.
       op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
index 973bccb0e5c..3752c2fb3af 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
@@ -34,6 +34,7 @@ import com.google.cloud.spanner.ErrorCode;
 import com.google.cloud.spanner.SpannerException;
 import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.junit.Before;
@@ -86,8 +87,11 @@ public class PartitionMetadataAdminDaoTest {
     partitionMetadataAdminDao.createPartitionMetadataTable();
     verify(databaseAdminClient, times(1))
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
-    assertEquals(1, ((Collection<?>) statements.getValue()).size());
-    assertTrue(statements.getValue().iterator().next().contains("CREATE 
TABLE"));
+    assertEquals(3, ((Collection<?>) statements.getValue()).size());
+    Iterator<String> it = statements.getValue().iterator();
+    assertTrue(it.next().contains("CREATE TABLE"));
+    assertTrue(it.next().contains("CREATE INDEX"));
+    assertTrue(it.next().contains("CREATE INDEX"));
   }
 
   @Test
@@ -96,8 +100,11 @@ public class PartitionMetadataAdminDaoTest {
     partitionMetadataAdminDaoPostgres.createPartitionMetadataTable();
     verify(databaseAdminClient, times(1))
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
-    assertEquals(1, ((Collection<?>) statements.getValue()).size());
-    assertTrue(statements.getValue().iterator().next().contains("CREATE TABLE 
\""));
+    assertEquals(3, ((Collection<?>) statements.getValue()).size());
+    Iterator<String> it = statements.getValue().iterator();
+    assertTrue(it.next().contains("CREATE TABLE \""));
+    assertTrue(it.next().contains("CREATE INDEX \""));
+    assertTrue(it.next().contains("CREATE INDEX \""));
   }
 
   @Test
@@ -129,8 +136,11 @@ public class PartitionMetadataAdminDaoTest {
     partitionMetadataAdminDao.deletePartitionMetadataTable();
     verify(databaseAdminClient, times(1))
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
-    assertEquals(1, ((Collection<?>) statements.getValue()).size());
-    assertTrue(statements.getValue().iterator().next().contains("DROP TABLE"));
+    assertEquals(3, ((Collection<?>) statements.getValue()).size());
+    Iterator<String> it = statements.getValue().iterator();
+    assertTrue(it.next().contains("DROP INDEX"));
+    assertTrue(it.next().contains("DROP INDEX"));
+    assertTrue(it.next().contains("DROP TABLE"));
   }
 
   @Test
@@ -139,8 +149,11 @@ public class PartitionMetadataAdminDaoTest {
     partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable();
     verify(databaseAdminClient, times(1))
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
-    assertEquals(1, ((Collection<?>) statements.getValue()).size());
-    assertTrue(statements.getValue().iterator().next().contains("DROP TABLE 
\""));
+    assertEquals(3, ((Collection<?>) statements.getValue()).size());
+    Iterator<String> it = statements.getValue().iterator();
+    assertTrue(it.next().contains("DROP INDEX \""));
+    assertTrue(it.next().contains("DROP INDEX \""));
+    assertTrue(it.next().contains("DROP TABLE \""));
   }
 
   @Test

Reply via email to