This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac87d7b48e8 fix: generate random index name for change streams (#32689)
ac87d7b48e8 is described below
commit ac87d7b48e86e0c3e863d13b5e8d52469134a446
Author: Thiago Nunes <[email protected]>
AuthorDate: Mon Oct 21 18:51:09 2024 +1100
fix: generate random index name for change streams (#32689)
Generates index names for change stream partition metadata table using a
random UUID. This prevents issues if the job is being redeployed in an
existing database.
---
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 19 ++-
.../gcp/spanner/changestreams/NameGenerator.java | 52 --------
.../gcp/spanner/changestreams/dao/DaoFactory.java | 12 +-
.../dao/PartitionMetadataAdminDao.java | 58 ++++-----
.../changestreams/dao/PartitionMetadataDao.java | 35 +++++
.../dao/PartitionMetadataTableNames.java | 144 +++++++++++++++++++++
.../dofn/CleanUpReadChangeStreamDoFn.java | 4 +-
.../spanner/changestreams/dofn/InitializeDoFn.java | 1 +
.../spanner/changestreams/NameGeneratorTest.java | 41 ------
.../dao/PartitionMetadataAdminDaoTest.java | 56 ++++++--
.../dao/PartitionMetadataTableNamesTest.java | 73 +++++++++++
11 files changed, 344 insertions(+), 151 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 435bbba9ae8..d9dde11a308 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -25,7 +25,6 @@ import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsCons
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
-import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
@@ -61,6 +60,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -77,6 +77,7 @@ import
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
@@ -1772,9 +1773,13 @@ public class SpannerIO {
+ fullPartitionMetadataDatabaseId
+ " has dialect "
+ metadataDatabaseDialect);
- final String partitionMetadataTableName =
- MoreObjects.firstNonNull(
- getMetadataTable(),
generatePartitionMetadataTableName(partitionMetadataDatabaseId));
+ PartitionMetadataTableNames partitionMetadataTableNames =
+ Optional.ofNullable(getMetadataTable())
+ .map(
+ table ->
+ PartitionMetadataTableNames.fromExistingTable(
+ partitionMetadataDatabaseId, table))
+
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add
1ns to transform the
@@ -1791,7 +1796,7 @@ public class SpannerIO {
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
- partitionMetadataTableName,
+ partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
@@ -1807,7 +1812,9 @@ public class SpannerIO {
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);
- LOG.info("Partition metadata table that will be used is " +
partitionMetadataTableName);
+ LOG.info(
+ "Partition metadata table that will be used is "
+ + partitionMetadataTableNames.getTableName());
final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
final PCollection<PartitionMetadata> partitionsOut =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java
deleted file mode 100644
index 322e85cb07a..00000000000
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams;
-
-import java.util.UUID;
-
-/**
- * This class generates a unique name for the partition metadata table, which
is created when the
- * Connector is initialized.
- */
-public class NameGenerator {
-
- private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
"Metadata_%s_%s";
- private static final int MAX_TABLE_NAME_LENGTH = 63;
-
- /**
- * Generates an unique name for the partition metadata table in the form of
{@code
- * "Metadata_<databaseId>_<uuid>"}.
- *
- * @param databaseId The database id where the table will be created
- * @return the unique generated name of the partition metadata table
- */
- public static String generatePartitionMetadataTableName(String databaseId) {
- // There are 11 characters in the name format.
- // Maximum Spanner database ID length is 30 characters.
- // UUID always generates a String with 36 characters.
- // Since the Postgres table name length is 63, we may need to truncate the
table name depending
- // on the database length.
- String fullString =
- String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId,
UUID.randomUUID())
- .replaceAll("-", "_");
- if (fullString.length() < MAX_TABLE_NAME_LENGTH) {
- return fullString;
- }
- return fullString.substring(0, MAX_TABLE_NAME_LENGTH);
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
index b9718fdb675..787abad02e0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
@@ -44,7 +44,7 @@ public class DaoFactory implements Serializable {
private final SpannerConfig metadataSpannerConfig;
private final String changeStreamName;
- private final String partitionMetadataTableName;
+ private final PartitionMetadataTableNames partitionMetadataTableNames;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect spannerChangeStreamDatabaseDialect;
@@ -56,7 +56,7 @@ public class DaoFactory implements Serializable {
* @param changeStreamSpannerConfig the configuration for the change streams
DAO
* @param changeStreamName the name of the change stream for the change
streams DAO
* @param metadataSpannerConfig the metadata tables configuration
- * @param partitionMetadataTableName the name of the created partition
metadata table
+ * @param partitionMetadataTableNames the names of the partition metadata
ddl objects
* @param rpcPriority the priority of the requests made by the DAO queries
* @param jobName the name of the running job
*/
@@ -64,7 +64,7 @@ public class DaoFactory implements Serializable {
SpannerConfig changeStreamSpannerConfig,
String changeStreamName,
SpannerConfig metadataSpannerConfig,
- String partitionMetadataTableName,
+ PartitionMetadataTableNames partitionMetadataTableNames,
RpcPriority rpcPriority,
String jobName,
Dialect spannerChangeStreamDatabaseDialect,
@@ -78,7 +78,7 @@ public class DaoFactory implements Serializable {
this.changeStreamSpannerConfig = changeStreamSpannerConfig;
this.changeStreamName = changeStreamName;
this.metadataSpannerConfig = metadataSpannerConfig;
- this.partitionMetadataTableName = partitionMetadataTableName;
+ this.partitionMetadataTableNames = partitionMetadataTableNames;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.spannerChangeStreamDatabaseDialect =
spannerChangeStreamDatabaseDialect;
@@ -102,7 +102,7 @@ public class DaoFactory implements Serializable {
databaseAdminClient,
metadataSpannerConfig.getInstanceId().get(),
metadataSpannerConfig.getDatabaseId().get(),
- partitionMetadataTableName,
+ partitionMetadataTableNames,
this.metadataDatabaseDialect);
}
return partitionMetadataAdminDao;
@@ -120,7 +120,7 @@ public class DaoFactory implements Serializable {
if (partitionMetadataDaoInstance == null) {
partitionMetadataDaoInstance =
new PartitionMetadataDao(
- this.partitionMetadataTableName,
+ this.partitionMetadataTableNames.getTableName(),
spannerAccessor.getDatabaseClient(),
this.metadataDatabaseDialect);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
index 368cab7022b..3e6045d8858 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
@@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";
- /** Metadata table index for queries over the watermark column. */
- public static final String WATERMARK_INDEX = "WatermarkIndex";
-
- /** Metadata table index for queries over the created at / start timestamp
columns. */
- public static final String CREATED_AT_START_TIMESTAMP_INDEX =
"CreatedAtStartTimestampIndex";
-
private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;
private final DatabaseAdminClient databaseAdminClient;
private final String instanceId;
private final String databaseId;
- private final String tableName;
+ private final PartitionMetadataTableNames names;
private final Dialect dialect;
/**
@@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao {
* table
* @param instanceId the instance where the metadata table will reside
* @param databaseId the database where the metadata table will reside
- * @param tableName the name of the metadata table
+ * @param names the names of the metadata table ddl objects
*/
PartitionMetadataAdminDao(
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
- String tableName,
+ PartitionMetadataTableNames names,
Dialect dialect) {
this.databaseAdminClient = databaseAdminClient;
this.instanceId = instanceId;
this.databaseId = databaseId;
- this.tableName = tableName;
+ this.names = names;
this.dialect = dialect;
}
@@ -128,8 +122,8 @@ public class PartitionMetadataAdminDao {
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
ddl.add(
- "CREATE TABLE \""
- + tableName
+ "CREATE TABLE IF NOT EXISTS \""
+ + names.getTableName()
+ "\"(\""
+ COLUMN_PARTITION_TOKEN
+ "\" text NOT NULL,\""
@@ -163,20 +157,20 @@ public class PartitionMetadataAdminDao {
+ COLUMN_FINISHED_AT
+ "\"");
ddl.add(
- "CREATE INDEX \""
- + WATERMARK_INDEX
+ "CREATE INDEX IF NOT EXISTS \""
+ + names.getWatermarkIndexName()
+ "\" on \""
- + tableName
+ + names.getTableName()
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
- "CREATE INDEX \""
- + CREATED_AT_START_TIMESTAMP_INDEX
+ "CREATE INDEX IF NOT EXISTS \""
+ + names.getCreatedAtIndexName()
+ "\" ON \""
- + tableName
+ + names.getTableName()
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
@@ -184,8 +178,8 @@ public class PartitionMetadataAdminDao {
+ "\")");
} else {
ddl.add(
- "CREATE TABLE "
- + tableName
+ "CREATE TABLE IF NOT EXISTS "
+ + names.getTableName()
+ " ("
+ COLUMN_PARTITION_TOKEN
+ " STRING(MAX) NOT NULL,"
@@ -218,20 +212,20 @@ public class PartitionMetadataAdminDao {
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " DAY))");
ddl.add(
- "CREATE INDEX "
- + WATERMARK_INDEX
+ "CREATE INDEX IF NOT EXISTS "
+ + names.getWatermarkIndexName()
+ " on "
- + tableName
+ + names.getTableName()
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
- "CREATE INDEX "
- + CREATED_AT_START_TIMESTAMP_INDEX
+ "CREATE INDEX IF NOT EXISTS "
+ + names.getCreatedAtIndexName()
+ " ON "
- + tableName
+ + names.getTableName()
+ " ("
+ COLUMN_CREATED_AT
+ ","
@@ -261,16 +255,14 @@ public class PartitionMetadataAdminDao {
* Drops the metadata table. This operation should complete in {@link
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
- public void deletePartitionMetadataTable() {
+ public void deletePartitionMetadataTable(List<String> indexes) {
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
- ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
- ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
- ddl.add("DROP TABLE \"" + tableName + "\"");
+ indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\""));
+ ddl.add("DROP TABLE \"" + names.getTableName() + "\"");
} else {
- ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
- ddl.add("DROP INDEX " + WATERMARK_INDEX);
- ddl.add("DROP TABLE " + tableName);
+ indexes.forEach(index -> ddl.add("DROP INDEX " + index));
+ ddl.add("DROP TABLE " + names.getTableName());
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl,
null);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index 7867932cd1a..654fd946663 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -96,6 +96,41 @@ public class PartitionMetadataDao {
}
}
+ /**
+ * Finds all indexes for the metadata table.
+ *
+ * @return a list of index names for the metadata table.
+ */
+ public List<String> findAllTableIndexes() {
+ String indexesStmt;
+ if (this.isPostgres()) {
+ indexesStmt =
+ "SELECT index_name FROM information_schema.indexes"
+ + " WHERE table_schema = 'public'"
+ + " AND table_name = '"
+ + metadataTableName
+ + "' AND index_type != 'PRIMARY_KEY'";
+ } else {
+ indexesStmt =
+ "SELECT index_name FROM information_schema.indexes"
+ + " WHERE table_schema = ''"
+ + " AND table_name = '"
+ + metadataTableName
+ + "' AND index_type != 'PRIMARY_KEY'";
+ }
+
+ List<String> result = new ArrayList<>();
+ try (ResultSet queryResultSet =
+ databaseClient
+ .singleUseReadOnlyTransaction()
+ .executeQuery(Statement.of(indexesStmt),
Options.tag("query=findAllTableIndexes"))) {
+ while (queryResultSet.next()) {
+ result.add(queryResultSet.getString("index_name"));
+ }
+ }
+ return result;
+ }
+
/**
* Fetches the partition metadata row data for the given partition token.
*
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java
new file mode 100644
index 00000000000..07d7b80676d
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Configuration for a partition metadata table. It encapsulates the name of
the metadata table and
+ * indexes.
+ */
+public class PartitionMetadataTableNames implements Serializable {
+
+ private static final long serialVersionUID = 8848098877671834584L;
+
+ /** PostgreSQL max table and index length is 63 bytes. */
+ @VisibleForTesting static final int MAX_NAME_LENGTH = 63;
+
+ private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
"Metadata_%s_%s";
+ private static final String WATERMARK_INDEX_NAME_FORMAT =
"WatermarkIdx_%s_%s";
+ private static final String CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT =
"CreatedAtIdx_%s_%s";
+
+ /**
+ * Generates a unique name for the partition metadata table and its indexes.
The table name will
+ * be in the form of {@code "Metadata_<databaseId>_<uuid>"}. The watermark
index will be in the
+ * form of {@code "WatermarkIdx_<databaseId>_<uuid>}. The createdAt / start
timestamp index will
+ * be in the form of {@code "CreatedAtIdx_<databaseId>_<uuid>}.
+ *
+ * @param databaseId The database id where the table will be created
+ * @return the unique generated names of the partition metadata ddl
+ */
+ public static PartitionMetadataTableNames generateRandom(String databaseId) {
+ UUID uuid = UUID.randomUUID();
+
+ String table = generateName(PARTITION_METADATA_TABLE_NAME_FORMAT,
databaseId, uuid);
+ String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT,
databaseId, uuid);
+ String createdAtIndex =
+ generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId,
uuid);
+
+ return new PartitionMetadataTableNames(table, watermarkIndex,
createdAtIndex);
+ }
+
+ /**
+ * Encapsulates a selected table name. Index names are generated, but will
only be used if the
+ * given table does not exist. The watermark index will be in the form of
{@code
+ * "WatermarkIdx_<databaseId>_<uuid>}. The createdAt / start timestamp index
will be in the form
+ * of {@code "CreatedAtIdx_<databaseId>_<uuid>}.
+ *
+ * @param databaseId The database id for the table
+ * @param table The table name to be used
+ * @return an instance with the table name and generated index names
+ */
+ public static PartitionMetadataTableNames fromExistingTable(String
databaseId, String table) {
+ UUID uuid = UUID.randomUUID();
+
+ String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT,
databaseId, uuid);
+ String createdAtIndex =
+ generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId,
uuid);
+ return new PartitionMetadataTableNames(table, watermarkIndex,
createdAtIndex);
+ }
+
+ private static String generateName(String template, String databaseId, UUID
uuid) {
+ String name = String.format(template, databaseId, uuid).replaceAll("-",
"_");
+ if (name.length() > MAX_NAME_LENGTH) {
+ return name.substring(0, MAX_NAME_LENGTH);
+ }
+ return name;
+ }
+
+ private final String tableName;
+ private final String watermarkIndexName;
+ private final String createdAtIndexName;
+
+ public PartitionMetadataTableNames(
+ String tableName, String watermarkIndexName, String createdAtIndexName) {
+ this.tableName = tableName;
+ this.watermarkIndexName = watermarkIndexName;
+ this.createdAtIndexName = createdAtIndexName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getWatermarkIndexName() {
+ return watermarkIndexName;
+ }
+
+ public String getCreatedAtIndexName() {
+ return createdAtIndexName;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PartitionMetadataTableNames)) {
+ return false;
+ }
+ PartitionMetadataTableNames that = (PartitionMetadataTableNames) o;
+ return Objects.equals(tableName, that.tableName)
+ && Objects.equals(watermarkIndexName, that.watermarkIndexName)
+ && Objects.equals(createdAtIndexName, that.createdAtIndexName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, watermarkIndexName, createdAtIndexName);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionMetadataTableNames{"
+ + "tableName='"
+ + tableName
+ + '\''
+ + ", watermarkIndexName='"
+ + watermarkIndexName
+ + '\''
+ + ", createdAtIndexName='"
+ + createdAtIndexName
+ + '\''
+ + '}';
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
index a048c885a00..f8aa497292b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
import java.io.Serializable;
+import java.util.List;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.transforms.DoFn;
@@ -33,6 +34,7 @@ public class CleanUpReadChangeStreamDoFn extends DoFn<byte[],
Void> implements S
@ProcessElement
public void processElement(OutputReceiver<Void> receiver) {
- daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable();
+ List<String> indexes =
daoFactory.getPartitionMetadataDao().findAllTableIndexes();
+
daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable(indexes);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
index 387ffd603b1..ca93f34bf1b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
@@ -64,6 +64,7 @@ public class InitializeDoFn extends DoFn<byte[],
PartitionMetadata> implements S
public void processElement(OutputReceiver<PartitionMetadata> receiver) {
PartitionMetadataDao partitionMetadataDao =
daoFactory.getPartitionMetadataDao();
if (!partitionMetadataDao.tableExists()) {
+ // Creates partition metadata table and associated indexes
daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable();
createFakeParentPartition();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java
deleted file mode 100644
index f15fc530737..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class NameGeneratorTest {
- private static final int MAXIMUM_POSTGRES_TABLE_NAME_LENGTH = 63;
-
- @Test
- public void testGenerateMetadataTableNameRemovesHyphens() {
- final String tableName =
-
NameGenerator.generatePartitionMetadataTableName("my-database-id-12345");
- assertFalse(tableName.contains("-"));
- }
-
- @Test
- public void testGenerateMetadataTableNameIsShorterThan64Characters() {
- final String tableName =
-
NameGenerator.generatePartitionMetadataTableName("my-database-id1-maximum-length");
- assertTrue(tableName.length() <= MAXIMUM_POSTGRES_TABLE_NAME_LENGTH);
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
index 3752c2fb3af..02b9d111583 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
@@ -33,7 +33,9 @@ import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -58,6 +60,8 @@ public class PartitionMetadataAdminDaoTest {
private static final String DATABASE_ID = "SPANNER_DATABASE";
private static final String TABLE_NAME = "SPANNER_TABLE";
+ private static final String WATERMARK_INDEX_NAME = "WATERMARK_INDEX";
+ private static final String CREATED_AT_INDEX_NAME = "CREATED_AT_INDEX";
private static final int TIMEOUT_MINUTES = 10;
@@ -68,12 +72,14 @@ public class PartitionMetadataAdminDaoTest {
@Before
public void setUp() {
databaseAdminClient = mock(DatabaseAdminClient.class);
+ PartitionMetadataTableNames names =
+ new PartitionMetadataTableNames(TABLE_NAME, WATERMARK_INDEX_NAME,
CREATED_AT_INDEX_NAME);
partitionMetadataAdminDao =
new PartitionMetadataAdminDao(
- databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME,
Dialect.GOOGLE_STANDARD_SQL);
+ databaseAdminClient, INSTANCE_ID, DATABASE_ID, names,
Dialect.GOOGLE_STANDARD_SQL);
partitionMetadataAdminDaoPostgres =
new PartitionMetadataAdminDao(
- databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME,
Dialect.POSTGRESQL);
+ databaseAdminClient, INSTANCE_ID, DATABASE_ID, names,
Dialect.POSTGRESQL);
op = (OperationFuture<Void, UpdateDatabaseDdlMetadata>)
mock(OperationFuture.class);
statements = ArgumentCaptor.forClass(Iterable.class);
when(databaseAdminClient.updateDatabaseDdl(
@@ -89,9 +95,9 @@ public class PartitionMetadataAdminDaoTest {
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
- assertTrue(it.next().contains("CREATE TABLE"));
- assertTrue(it.next().contains("CREATE INDEX"));
- assertTrue(it.next().contains("CREATE INDEX"));
+ assertTrue(it.next().contains("CREATE TABLE IF NOT EXISTS"));
+ assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS"));
+ assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS"));
}
@Test
@@ -102,9 +108,9 @@ public class PartitionMetadataAdminDaoTest {
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
- assertTrue(it.next().contains("CREATE TABLE \""));
- assertTrue(it.next().contains("CREATE INDEX \""));
- assertTrue(it.next().contains("CREATE INDEX \""));
+ assertTrue(it.next().contains("CREATE TABLE IF NOT EXISTS \""));
+ assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS \""));
+ assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS \""));
}
@Test
@@ -133,7 +139,8 @@ public class PartitionMetadataAdminDaoTest {
@Test
public void testDeletePartitionMetadataTable() throws Exception {
when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
- partitionMetadataAdminDao.deletePartitionMetadataTable();
+ partitionMetadataAdminDao.deletePartitionMetadataTable(
+ Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
assertEquals(3, ((Collection<?>) statements.getValue()).size());
@@ -143,10 +150,22 @@ public class PartitionMetadataAdminDaoTest {
assertTrue(it.next().contains("DROP TABLE"));
}
+ @Test
+ public void testDeletePartitionMetadataTableWithNoIndexes() throws Exception
{
+ when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
+
partitionMetadataAdminDao.deletePartitionMetadataTable(Collections.emptyList());
+ verify(databaseAdminClient, times(1))
+ .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
+ assertEquals(1, ((Collection<?>) statements.getValue()).size());
+ Iterator<String> it = statements.getValue().iterator();
+ assertTrue(it.next().contains("DROP TABLE"));
+ }
+
@Test
public void testDeletePartitionMetadataTablePostgres() throws Exception {
when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
- partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable();
+ partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(
+ Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
assertEquals(3, ((Collection<?>) statements.getValue()).size());
@@ -156,11 +175,23 @@ public class PartitionMetadataAdminDaoTest {
assertTrue(it.next().contains("DROP TABLE \""));
}
+ @Test
+ public void testDeletePartitionMetadataTablePostgresWithNoIndexes() throws
Exception {
+ when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
+
partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(Collections.emptyList());
+ verify(databaseAdminClient, times(1))
+ .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID),
statements.capture(), isNull());
+ assertEquals(1, ((Collection<?>) statements.getValue()).size());
+ Iterator<String> it = statements.getValue().iterator();
+ assertTrue(it.next().contains("DROP TABLE \""));
+ }
+
@Test
public void testDeletePartitionMetadataTableWithTimeoutException() throws
Exception {
when(op.get(10, TimeUnit.MINUTES)).thenThrow(new
TimeoutException(TIMED_OUT));
try {
- partitionMetadataAdminDao.deletePartitionMetadataTable();
+ partitionMetadataAdminDao.deletePartitionMetadataTable(
+ Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
fail();
} catch (SpannerException e) {
assertTrue(e.getMessage().contains(TIMED_OUT));
@@ -171,7 +202,8 @@ public class PartitionMetadataAdminDaoTest {
public void testDeletePartitionMetadataTableWithInterruptedException()
throws Exception {
when(op.get(10, TimeUnit.MINUTES)).thenThrow(new
InterruptedException(INTERRUPTED));
try {
- partitionMetadataAdminDao.deletePartitionMetadataTable();
+ partitionMetadataAdminDao.deletePartitionMetadataTable(
+ Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
fail();
} catch (SpannerException e) {
assertEquals(ErrorCode.CANCELLED, e.getErrorCode());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java
new file mode 100644
index 00000000000..2aae5b26a2c
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class PartitionMetadataTableNamesTest {
+ @Test
+ public void testGeneratePartitionMetadataNamesRemovesHyphens() {
+ String databaseId = "my-database-id-12345";
+
+ PartitionMetadataTableNames names1 =
PartitionMetadataTableNames.generateRandom(databaseId);
+ assertFalse(names1.getTableName().contains("-"));
+ assertFalse(names1.getWatermarkIndexName().contains("-"));
+ assertFalse(names1.getCreatedAtIndexName().contains("-"));
+
+ PartitionMetadataTableNames names2 =
PartitionMetadataTableNames.generateRandom(databaseId);
+ assertNotEquals(names1.getTableName(), names2.getTableName());
+ assertNotEquals(names1.getWatermarkIndexName(),
names2.getWatermarkIndexName());
+ assertNotEquals(names1.getCreatedAtIndexName(),
names2.getCreatedAtIndexName());
+ }
+
+ @Test
+ public void testGeneratePartitionMetadataNamesIsShorterThan64Characters() {
+ PartitionMetadataTableNames names =
+ PartitionMetadataTableNames.generateRandom(
+
"my-database-id-larger-than-maximum-length-1234567890-1234567890-1234567890");
+ assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH);
+ assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH);
+ assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH);
+
+ names = PartitionMetadataTableNames.generateRandom("d");
+ assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH);
+ assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH);
+ assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH);
+ }
+
+ @Test
+ public void testPartitionMetadataNamesFromExistingTable() {
+ PartitionMetadataTableNames names1 =
+ PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable");
+ assertEquals("mytable", names1.getTableName());
+ assertFalse(names1.getWatermarkIndexName().contains("-"));
+ assertFalse(names1.getCreatedAtIndexName().contains("-"));
+
+ PartitionMetadataTableNames names2 =
+ PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable");
+ assertEquals("mytable", names2.getTableName());
+ assertNotEquals(names1.getWatermarkIndexName(),
names2.getWatermarkIndexName());
+ assertNotEquals(names1.getCreatedAtIndexName(),
names2.getCreatedAtIndexName());
+ }
+}