This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6b4a7a5d73e feat: optimize Spanner changestream metadata table (#32213)
6b4a7a5d73e is described below
commit 6b4a7a5d73eacb92fe1b225679c9aea5b89cdc44
Author: Thiago Nunes <[email protected]>
AuthorDate: Tue Aug 20 02:26:13 2024 +1000
feat: optimize Spanner changestream metadata table (#32213)
* feat: optimize Spanner changestream metadata table
* fix: linting
* tests: fixes admin dao tests
---
.../dao/PartitionMetadataAdminDao.java | 83 ++++++++++++++++++----
.../dao/PartitionMetadataAdminDaoTest.java | 29 +++++---
2 files changed, 89 insertions(+), 23 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
index 8f0951d4ac5..368cab7022b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
@@ -23,7 +23,8 @@ import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -78,6 +79,12 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";
+ /** Metadata table index for queries over the watermark column. */
+ public static final String WATERMARK_INDEX = "WatermarkIndex";
+
+ /** Metadata table index for queries over the created at / start timestamp
columns. */
+ public static final String CREATED_AT_START_TIMESTAMP_INDEX =
"CreatedAtStartTimestampIndex";
+
private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;
@@ -117,10 +124,10 @@ public class PartitionMetadataAdminDao {
* PartitionMetadataAdminDao#TTL_AFTER_PARTITION_FINISHED_DAYS} days.
*/
public void createPartitionMetadataTable() {
- String metadataCreateStmt = "";
+ List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
- metadataCreateStmt =
+ ddl.add(
"CREATE TABLE \""
+ tableName
+ "\"(\""
@@ -146,15 +153,37 @@ public class PartitionMetadataAdminDao {
+ "\" SPANNER.COMMIT_TIMESTAMP,\""
+ COLUMN_FINISHED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,"
- + " PRIMARY KEY (\"PartitionToken\")"
+ + " PRIMARY KEY (\""
+ + COLUMN_PARTITION_TOKEN
+ + "\")"
+ ")"
+ " TTL INTERVAL '"
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " days' ON \""
+ COLUMN_FINISHED_AT
- + "\"";
+ + "\"");
+ ddl.add(
+ "CREATE INDEX \""
+ + WATERMARK_INDEX
+ + "\" on \""
+ + tableName
+ + "\" (\""
+ + COLUMN_WATERMARK
+ + "\") INCLUDE (\""
+ + COLUMN_STATE
+ + "\")");
+ ddl.add(
+ "CREATE INDEX \""
+ + CREATED_AT_START_TIMESTAMP_INDEX
+ + "\" ON \""
+ + tableName
+ + "\" (\""
+ + COLUMN_CREATED_AT
+ + "\",\""
+ + COLUMN_START_TIMESTAMP
+ + "\")");
} else {
- metadataCreateStmt =
+ ddl.add(
"CREATE TABLE "
+ tableName
+ " ("
@@ -180,16 +209,37 @@ public class PartitionMetadataAdminDao {
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_FINISHED_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
- + ") PRIMARY KEY (PartitionToken),"
+ + ") PRIMARY KEY ("
+ + COLUMN_PARTITION_TOKEN
+ + "),"
+ " ROW DELETION POLICY (OLDER_THAN("
+ COLUMN_FINISHED_AT
+ ", INTERVAL "
+ TTL_AFTER_PARTITION_FINISHED_DAYS
- + " DAY))";
+ + " DAY))");
+ ddl.add(
+ "CREATE INDEX "
+ + WATERMARK_INDEX
+ + " on "
+ + tableName
+ + " ("
+ + COLUMN_WATERMARK
+ + ") STORING ("
+ + COLUMN_STATE
+ + ")");
+ ddl.add(
+ "CREATE INDEX "
+ + CREATED_AT_START_TIMESTAMP_INDEX
+ + " ON "
+ + tableName
+ + " ("
+ + COLUMN_CREATED_AT
+ + ","
+ + COLUMN_START_TIMESTAMP
+ + ")");
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
- databaseAdminClient.updateDatabaseDdl(
- instanceId, databaseId,
Collections.singletonList(metadataCreateStmt), null);
+ databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl,
null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
@@ -212,15 +262,18 @@ public class PartitionMetadataAdminDao {
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
- String metadataDropStmt;
+ List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
- metadataDropStmt = "DROP TABLE \"" + tableName + "\"";
+ ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
+ ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
+ ddl.add("DROP TABLE \"" + tableName + "\"");
} else {
- metadataDropStmt = "DROP TABLE " + tableName;
+ ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
+ ddl.add("DROP INDEX " + WATERMARK_INDEX);
+ ddl.add("DROP TABLE " + tableName);
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
- databaseAdminClient.updateDatabaseDdl(
- instanceId, databaseId,
Collections.singletonList(metadataDropStmt), null);
+ databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl,
null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
index 973bccb0e5c..3752c2fb3af 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
@@ -34,6 +34,7 @@ import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
@@ -86,8 +87,11 @@ public class PartitionMetadataAdminDaoTest {
partitionMetadataAdminDao.createPartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
- assertEquals(1, ((Collection<?>) statements.getValue()).size());
- assertTrue(statements.getValue().iterator().next().contains("CREATE
TABLE"));
+ assertEquals(3, ((Collection<?>) statements.getValue()).size());
+ Iterator<String> it = statements.getValue().iterator();
+ assertTrue(it.next().contains("CREATE TABLE"));
+ assertTrue(it.next().contains("CREATE INDEX"));
+ assertTrue(it.next().contains("CREATE INDEX"));
}
@Test
@@ -96,8 +100,11 @@ public class PartitionMetadataAdminDaoTest {
partitionMetadataAdminDaoPostgres.createPartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
- assertEquals(1, ((Collection<?>) statements.getValue()).size());
- assertTrue(statements.getValue().iterator().next().contains("CREATE TABLE
\""));
+ assertEquals(3, ((Collection<?>) statements.getValue()).size());
+ Iterator<String> it = statements.getValue().iterator();
+ assertTrue(it.next().contains("CREATE TABLE \""));
+ assertTrue(it.next().contains("CREATE INDEX \""));
+ assertTrue(it.next().contains("CREATE INDEX \""));
}
@Test
@@ -129,8 +136,11 @@ public class PartitionMetadataAdminDaoTest {
partitionMetadataAdminDao.deletePartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
- assertEquals(1, ((Collection<?>) statements.getValue()).size());
- assertTrue(statements.getValue().iterator().next().contains("DROP TABLE"));
+ assertEquals(3, ((Collection<?>) statements.getValue()).size());
+ Iterator<String> it = statements.getValue().iterator();
+ assertTrue(it.next().contains("DROP INDEX"));
+ assertTrue(it.next().contains("DROP INDEX"));
+ assertTrue(it.next().contains("DROP TABLE"));
}
@Test
@@ -139,8 +149,11 @@ public class PartitionMetadataAdminDaoTest {
partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
- assertEquals(1, ((Collection<?>) statements.getValue()).size());
- assertTrue(statements.getValue().iterator().next().contains("DROP TABLE
\""));
+ assertEquals(3, ((Collection<?>) statements.getValue()).size());
+ Iterator<String> it = statements.getValue().iterator();
+ assertTrue(it.next().contains("DROP INDEX \""));
+ assertTrue(it.next().contains("DROP INDEX \""));
+ assertTrue(it.next().contains("DROP TABLE \""));
}
@Test