This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac87d7b48e8 fix: generate random index name for change streams (#32689)
ac87d7b48e8 is described below

commit ac87d7b48e86e0c3e863d13b5e8d52469134a446
Author: Thiago Nunes <[email protected]>
AuthorDate: Mon Oct 21 18:51:09 2024 +1100

    fix: generate random index name for change streams (#32689)
    
    Generates index names for change stream partition metadata table using a
    random UUID. This prevents issues if the job is being redeployed in an
    existing database.
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |  19 ++-
 .../gcp/spanner/changestreams/NameGenerator.java   |  52 --------
 .../gcp/spanner/changestreams/dao/DaoFactory.java  |  12 +-
 .../dao/PartitionMetadataAdminDao.java             |  58 ++++-----
 .../changestreams/dao/PartitionMetadataDao.java    |  35 +++++
 .../dao/PartitionMetadataTableNames.java           | 144 +++++++++++++++++++++
 .../dofn/CleanUpReadChangeStreamDoFn.java          |   4 +-
 .../spanner/changestreams/dofn/InitializeDoFn.java |   1 +
 .../spanner/changestreams/NameGeneratorTest.java   |  41 ------
 .../dao/PartitionMetadataAdminDaoTest.java         |  56 ++++++--
 .../dao/PartitionMetadataTableNamesTest.java       |  73 +++++++++++
 11 files changed, 344 insertions(+), 151 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 435bbba9ae8..d9dde11a308 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -25,7 +25,6 @@ import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsCons
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
-import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
@@ -61,6 +60,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -77,6 +77,7 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
@@ -1772,9 +1773,13 @@ public class SpannerIO {
               + fullPartitionMetadataDatabaseId
               + " has dialect "
               + metadataDatabaseDialect);
-      final String partitionMetadataTableName =
-          MoreObjects.firstNonNull(
-              getMetadataTable(), 
generatePartitionMetadataTableName(partitionMetadataDatabaseId));
+      PartitionMetadataTableNames partitionMetadataTableNames =
+          Optional.ofNullable(getMetadataTable())
+              .map(
+                  table ->
+                      PartitionMetadataTableNames.fromExistingTable(
+                          partitionMetadataDatabaseId, table))
+              
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
       final String changeStreamName = getChangeStreamName();
       final Timestamp startTimestamp = getInclusiveStartAt();
       // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 
1ns to transform the
@@ -1791,7 +1796,7 @@ public class SpannerIO {
               changeStreamSpannerConfig,
               changeStreamName,
               partitionMetadataSpannerConfig,
-              partitionMetadataTableName,
+              partitionMetadataTableNames,
               rpcPriority,
               input.getPipeline().getOptions().getJobName(),
               changeStreamDatabaseDialect,
@@ -1807,7 +1812,9 @@ public class SpannerIO {
       final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
           new PostProcessingMetricsDoFn(metrics);
 
-      LOG.info("Partition metadata table that will be used is " + 
partitionMetadataTableName);
+      LOG.info(
+          "Partition metadata table that will be used is "
+              + partitionMetadataTableNames.getTableName());
 
       final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
       final PCollection<PartitionMetadata> partitionsOut =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java
deleted file mode 100644
index 322e85cb07a..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams;
-
-import java.util.UUID;
-
-/**
- * This class generates a unique name for the partition metadata table, which 
is created when the
- * Connector is initialized.
- */
-public class NameGenerator {
-
-  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = 
"Metadata_%s_%s";
-  private static final int MAX_TABLE_NAME_LENGTH = 63;
-
-  /**
-   * Generates an unique name for the partition metadata table in the form of 
{@code
-   * "Metadata_<databaseId>_<uuid>"}.
-   *
-   * @param databaseId The database id where the table will be created
-   * @return the unique generated name of the partition metadata table
-   */
-  public static String generatePartitionMetadataTableName(String databaseId) {
-    // There are 11 characters in the name format.
-    // Maximum Spanner database ID length is 30 characters.
-    // UUID always generates a String with 36 characters.
-    // Since the Postgres table name length is 63, we may need to truncate the 
table name depending
-    // on the database length.
-    String fullString =
-        String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, 
UUID.randomUUID())
-            .replaceAll("-", "_");
-    if (fullString.length() < MAX_TABLE_NAME_LENGTH) {
-      return fullString;
-    }
-    return fullString.substring(0, MAX_TABLE_NAME_LENGTH);
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
index b9718fdb675..787abad02e0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
@@ -44,7 +44,7 @@ public class DaoFactory implements Serializable {
   private final SpannerConfig metadataSpannerConfig;
 
   private final String changeStreamName;
-  private final String partitionMetadataTableName;
+  private final PartitionMetadataTableNames partitionMetadataTableNames;
   private final RpcPriority rpcPriority;
   private final String jobName;
   private final Dialect spannerChangeStreamDatabaseDialect;
@@ -56,7 +56,7 @@ public class DaoFactory implements Serializable {
    * @param changeStreamSpannerConfig the configuration for the change streams 
DAO
    * @param changeStreamName the name of the change stream for the change 
streams DAO
    * @param metadataSpannerConfig the metadata tables configuration
-   * @param partitionMetadataTableName the name of the created partition 
metadata table
+   * @param partitionMetadataTableNames the names of the partition metadata 
ddl objects
    * @param rpcPriority the priority of the requests made by the DAO queries
    * @param jobName the name of the running job
    */
@@ -64,7 +64,7 @@ public class DaoFactory implements Serializable {
       SpannerConfig changeStreamSpannerConfig,
       String changeStreamName,
       SpannerConfig metadataSpannerConfig,
-      String partitionMetadataTableName,
+      PartitionMetadataTableNames partitionMetadataTableNames,
       RpcPriority rpcPriority,
       String jobName,
       Dialect spannerChangeStreamDatabaseDialect,
@@ -78,7 +78,7 @@ public class DaoFactory implements Serializable {
     this.changeStreamSpannerConfig = changeStreamSpannerConfig;
     this.changeStreamName = changeStreamName;
     this.metadataSpannerConfig = metadataSpannerConfig;
-    this.partitionMetadataTableName = partitionMetadataTableName;
+    this.partitionMetadataTableNames = partitionMetadataTableNames;
     this.rpcPriority = rpcPriority;
     this.jobName = jobName;
     this.spannerChangeStreamDatabaseDialect = 
spannerChangeStreamDatabaseDialect;
@@ -102,7 +102,7 @@ public class DaoFactory implements Serializable {
               databaseAdminClient,
               metadataSpannerConfig.getInstanceId().get(),
               metadataSpannerConfig.getDatabaseId().get(),
-              partitionMetadataTableName,
+              partitionMetadataTableNames,
               this.metadataDatabaseDialect);
     }
     return partitionMetadataAdminDao;
@@ -120,7 +120,7 @@ public class DaoFactory implements Serializable {
     if (partitionMetadataDaoInstance == null) {
       partitionMetadataDaoInstance =
           new PartitionMetadataDao(
-              this.partitionMetadataTableName,
+              this.partitionMetadataTableNames.getTableName(),
               spannerAccessor.getDatabaseClient(),
               this.metadataDatabaseDialect);
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
index 368cab7022b..3e6045d8858 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java
@@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao {
    */
   public static final String COLUMN_FINISHED_AT = "FinishedAt";
 
-  /** Metadata table index for queries over the watermark column. */
-  public static final String WATERMARK_INDEX = "WatermarkIndex";
-
-  /** Metadata table index for queries over the created at / start timestamp 
columns. */
-  public static final String CREATED_AT_START_TIMESTAMP_INDEX = 
"CreatedAtStartTimestampIndex";
-
   private static final int TIMEOUT_MINUTES = 10;
   private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;
 
   private final DatabaseAdminClient databaseAdminClient;
   private final String instanceId;
   private final String databaseId;
-  private final String tableName;
+  private final PartitionMetadataTableNames names;
   private final Dialect dialect;
 
   /**
@@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao {
    *     table
    * @param instanceId the instance where the metadata table will reside
    * @param databaseId the database where the metadata table will reside
-   * @param tableName the name of the metadata table
+   * @param names the names of the metadata table ddl objects
    */
   PartitionMetadataAdminDao(
       DatabaseAdminClient databaseAdminClient,
       String instanceId,
       String databaseId,
-      String tableName,
+      PartitionMetadataTableNames names,
       Dialect dialect) {
     this.databaseAdminClient = databaseAdminClient;
     this.instanceId = instanceId;
     this.databaseId = databaseId;
-    this.tableName = tableName;
+    this.names = names;
     this.dialect = dialect;
   }
 
@@ -128,8 +122,8 @@ public class PartitionMetadataAdminDao {
     if (this.isPostgres()) {
       // Literals need be added around literals to preserve casing.
       ddl.add(
-          "CREATE TABLE \""
-              + tableName
+          "CREATE TABLE IF NOT EXISTS \""
+              + names.getTableName()
               + "\"(\""
               + COLUMN_PARTITION_TOKEN
               + "\" text NOT NULL,\""
@@ -163,20 +157,20 @@ public class PartitionMetadataAdminDao {
               + COLUMN_FINISHED_AT
               + "\"");
       ddl.add(
-          "CREATE INDEX \""
-              + WATERMARK_INDEX
+          "CREATE INDEX IF NOT EXISTS \""
+              + names.getWatermarkIndexName()
               + "\" on \""
-              + tableName
+              + names.getTableName()
               + "\" (\""
               + COLUMN_WATERMARK
               + "\") INCLUDE (\""
               + COLUMN_STATE
               + "\")");
       ddl.add(
-          "CREATE INDEX \""
-              + CREATED_AT_START_TIMESTAMP_INDEX
+          "CREATE INDEX IF NOT EXISTS \""
+              + names.getCreatedAtIndexName()
               + "\" ON \""
-              + tableName
+              + names.getTableName()
               + "\" (\""
               + COLUMN_CREATED_AT
               + "\",\""
@@ -184,8 +178,8 @@ public class PartitionMetadataAdminDao {
               + "\")");
     } else {
       ddl.add(
-          "CREATE TABLE "
-              + tableName
+          "CREATE TABLE IF NOT EXISTS "
+              + names.getTableName()
               + " ("
               + COLUMN_PARTITION_TOKEN
               + " STRING(MAX) NOT NULL,"
@@ -218,20 +212,20 @@ public class PartitionMetadataAdminDao {
               + TTL_AFTER_PARTITION_FINISHED_DAYS
               + " DAY))");
       ddl.add(
-          "CREATE INDEX "
-              + WATERMARK_INDEX
+          "CREATE INDEX IF NOT EXISTS "
+              + names.getWatermarkIndexName()
               + " on "
-              + tableName
+              + names.getTableName()
               + " ("
               + COLUMN_WATERMARK
               + ") STORING ("
               + COLUMN_STATE
               + ")");
       ddl.add(
-          "CREATE INDEX "
-              + CREATED_AT_START_TIMESTAMP_INDEX
+          "CREATE INDEX IF NOT EXISTS "
+              + names.getCreatedAtIndexName()
               + " ON "
-              + tableName
+              + names.getTableName()
               + " ("
               + COLUMN_CREATED_AT
               + ","
@@ -261,16 +255,14 @@ public class PartitionMetadataAdminDao {
    * Drops the metadata table. This operation should complete in {@link
    * PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
    */
-  public void deletePartitionMetadataTable() {
+  public void deletePartitionMetadataTable(List<String> indexes) {
     List<String> ddl = new ArrayList<>();
     if (this.isPostgres()) {
-      ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
-      ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
-      ddl.add("DROP TABLE \"" + tableName + "\"");
+      indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\""));
+      ddl.add("DROP TABLE \"" + names.getTableName() + "\"");
     } else {
-      ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
-      ddl.add("DROP INDEX " + WATERMARK_INDEX);
-      ddl.add("DROP TABLE " + tableName);
+      indexes.forEach(index -> ddl.add("DROP INDEX " + index));
+      ddl.add("DROP TABLE " + names.getTableName());
     }
     OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
         databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, 
null);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index 7867932cd1a..654fd946663 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -96,6 +96,41 @@ public class PartitionMetadataDao {
     }
   }
 
+  /**
+   * Finds all indexes for the metadata table.
+   *
+   * @return a list of index names for the metadata table.
+   */
+  public List<String> findAllTableIndexes() {
+    String indexesStmt;
+    if (this.isPostgres()) {
+      indexesStmt =
+          "SELECT index_name FROM information_schema.indexes"
+              + " WHERE table_schema = 'public'"
+              + " AND table_name = '"
+              + metadataTableName
+              + "' AND index_type != 'PRIMARY_KEY'";
+    } else {
+      indexesStmt =
+          "SELECT index_name FROM information_schema.indexes"
+              + " WHERE table_schema = ''"
+              + " AND table_name = '"
+              + metadataTableName
+              + "' AND index_type != 'PRIMARY_KEY'";
+    }
+
+    List<String> result = new ArrayList<>();
+    try (ResultSet queryResultSet =
+        databaseClient
+            .singleUseReadOnlyTransaction()
+            .executeQuery(Statement.of(indexesStmt), 
Options.tag("query=findAllTableIndexes"))) {
+      while (queryResultSet.next()) {
+        result.add(queryResultSet.getString("index_name"));
+      }
+    }
+    return result;
+  }
+
   /**
    * Fetches the partition metadata row data for the given partition token.
    *
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java
new file mode 100644
index 00000000000..07d7b80676d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Configuration for a partition metadata table. It encapsulates the name of 
the metadata table and
+ * indexes.
+ */
+public class PartitionMetadataTableNames implements Serializable {
+
+  private static final long serialVersionUID = 8848098877671834584L;
+
+  /** PostgreSQL max table and index length is 63 bytes. */
+  @VisibleForTesting static final int MAX_NAME_LENGTH = 63;
+
+  private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = 
"Metadata_%s_%s";
+  private static final String WATERMARK_INDEX_NAME_FORMAT = 
"WatermarkIdx_%s_%s";
+  private static final String CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT = 
"CreatedAtIdx_%s_%s";
+
+  /**
+   * Generates a unique name for the partition metadata table and its indexes. 
The table name will
+   * be in the form of {@code "Metadata_<databaseId>_<uuid>"}. The watermark 
index will be in the
+   * form of {@code "WatermarkIdx_<databaseId>_<uuid>}. The createdAt / start 
timestamp index will
+   * be in the form of {@code "CreatedAtIdx_<databaseId>_<uuid>}.
+   *
+   * @param databaseId The database id where the table will be created
+   * @return the unique generated names of the partition metadata ddl
+   */
+  public static PartitionMetadataTableNames generateRandom(String databaseId) {
+    UUID uuid = UUID.randomUUID();
+
+    String table = generateName(PARTITION_METADATA_TABLE_NAME_FORMAT, 
databaseId, uuid);
+    String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, 
databaseId, uuid);
+    String createdAtIndex =
+        generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, 
uuid);
+
+    return new PartitionMetadataTableNames(table, watermarkIndex, 
createdAtIndex);
+  }
+
+  /**
+   * Encapsulates a selected table name. Index names are generated, but will 
only be used if the
+   * given table does not exist. The watermark index will be in the form of 
{@code
+   * "WatermarkIdx_<databaseId>_<uuid>}. The createdAt / start timestamp index 
will be in the form
+   * of {@code "CreatedAtIdx_<databaseId>_<uuid>}.
+   *
+   * @param databaseId The database id for the table
+   * @param table The table name to be used
+   * @return an instance with the table name and generated index names
+   */
+  public static PartitionMetadataTableNames fromExistingTable(String 
databaseId, String table) {
+    UUID uuid = UUID.randomUUID();
+
+    String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, 
databaseId, uuid);
+    String createdAtIndex =
+        generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, 
uuid);
+    return new PartitionMetadataTableNames(table, watermarkIndex, 
createdAtIndex);
+  }
+
+  private static String generateName(String template, String databaseId, UUID 
uuid) {
+    String name = String.format(template, databaseId, uuid).replaceAll("-", 
"_");
+    if (name.length() > MAX_NAME_LENGTH) {
+      return name.substring(0, MAX_NAME_LENGTH);
+    }
+    return name;
+  }
+
+  private final String tableName;
+  private final String watermarkIndexName;
+  private final String createdAtIndexName;
+
+  public PartitionMetadataTableNames(
+      String tableName, String watermarkIndexName, String createdAtIndexName) {
+    this.tableName = tableName;
+    this.watermarkIndexName = watermarkIndexName;
+    this.createdAtIndexName = createdAtIndexName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getWatermarkIndexName() {
+    return watermarkIndexName;
+  }
+
+  public String getCreatedAtIndexName() {
+    return createdAtIndexName;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof PartitionMetadataTableNames)) {
+      return false;
+    }
+    PartitionMetadataTableNames that = (PartitionMetadataTableNames) o;
+    return Objects.equals(tableName, that.tableName)
+        && Objects.equals(watermarkIndexName, that.watermarkIndexName)
+        && Objects.equals(createdAtIndexName, that.createdAtIndexName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tableName, watermarkIndexName, createdAtIndexName);
+  }
+
+  @Override
+  public String toString() {
+    return "PartitionMetadataTableNames{"
+        + "tableName='"
+        + tableName
+        + '\''
+        + ", watermarkIndexName='"
+        + watermarkIndexName
+        + '\''
+        + ", createdAtIndexName='"
+        + createdAtIndexName
+        + '\''
+        + '}';
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
index a048c885a00..f8aa497292b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
 
 import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 
@@ -33,6 +34,7 @@ public class CleanUpReadChangeStreamDoFn extends DoFn<byte[], 
Void> implements S
 
   @ProcessElement
   public void processElement(OutputReceiver<Void> receiver) {
-    daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable();
+    List<String> indexes = 
daoFactory.getPartitionMetadataDao().findAllTableIndexes();
+    
daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable(indexes);
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
index 387ffd603b1..ca93f34bf1b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
@@ -64,6 +64,7 @@ public class InitializeDoFn extends DoFn<byte[], 
PartitionMetadata> implements S
   public void processElement(OutputReceiver<PartitionMetadata> receiver) {
     PartitionMetadataDao partitionMetadataDao = 
daoFactory.getPartitionMetadataDao();
     if (!partitionMetadataDao.tableExists()) {
+      // Creates partition metadata table and associated indexes
       daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable();
       createFakeParentPartition();
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java
deleted file mode 100644
index f15fc530737..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class NameGeneratorTest {
-  private static final int MAXIMUM_POSTGRES_TABLE_NAME_LENGTH = 63;
-
-  @Test
-  public void testGenerateMetadataTableNameRemovesHyphens() {
-    final String tableName =
-        
NameGenerator.generatePartitionMetadataTableName("my-database-id-12345");
-    assertFalse(tableName.contains("-"));
-  }
-
-  @Test
-  public void testGenerateMetadataTableNameIsShorterThan64Characters() {
-    final String tableName =
-        
NameGenerator.generatePartitionMetadataTableName("my-database-id1-maximum-length");
-    assertTrue(tableName.length() <= MAXIMUM_POSTGRES_TABLE_NAME_LENGTH);
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
index 3752c2fb3af..02b9d111583 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java
@@ -33,7 +33,9 @@ import com.google.cloud.spanner.Dialect;
 import com.google.cloud.spanner.ErrorCode;
 import com.google.cloud.spanner.SpannerException;
 import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -58,6 +60,8 @@ public class PartitionMetadataAdminDaoTest {
   private static final String DATABASE_ID = "SPANNER_DATABASE";
 
   private static final String TABLE_NAME = "SPANNER_TABLE";
+  private static final String WATERMARK_INDEX_NAME = "WATERMARK_INDEX";
+  private static final String CREATED_AT_INDEX_NAME = "CREATED_AT_INDEX";
 
   private static final int TIMEOUT_MINUTES = 10;
 
@@ -68,12 +72,14 @@ public class PartitionMetadataAdminDaoTest {
   @Before
   public void setUp() {
     databaseAdminClient = mock(DatabaseAdminClient.class);
+    PartitionMetadataTableNames names =
+        new PartitionMetadataTableNames(TABLE_NAME, WATERMARK_INDEX_NAME, 
CREATED_AT_INDEX_NAME);
     partitionMetadataAdminDao =
         new PartitionMetadataAdminDao(
-            databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME, 
Dialect.GOOGLE_STANDARD_SQL);
+            databaseAdminClient, INSTANCE_ID, DATABASE_ID, names, 
Dialect.GOOGLE_STANDARD_SQL);
     partitionMetadataAdminDaoPostgres =
         new PartitionMetadataAdminDao(
-            databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME, 
Dialect.POSTGRESQL);
+            databaseAdminClient, INSTANCE_ID, DATABASE_ID, names, 
Dialect.POSTGRESQL);
     op = (OperationFuture<Void, UpdateDatabaseDdlMetadata>) 
mock(OperationFuture.class);
     statements = ArgumentCaptor.forClass(Iterable.class);
     when(databaseAdminClient.updateDatabaseDdl(
@@ -89,9 +95,9 @@ public class PartitionMetadataAdminDaoTest {
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
     assertEquals(3, ((Collection<?>) statements.getValue()).size());
     Iterator<String> it = statements.getValue().iterator();
-    assertTrue(it.next().contains("CREATE TABLE"));
-    assertTrue(it.next().contains("CREATE INDEX"));
-    assertTrue(it.next().contains("CREATE INDEX"));
+    assertTrue(it.next().contains("CREATE TABLE IF NOT EXISTS"));
+    assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS"));
+    assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS"));
   }
 
   @Test
@@ -102,9 +108,9 @@ public class PartitionMetadataAdminDaoTest {
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
     assertEquals(3, ((Collection<?>) statements.getValue()).size());
     Iterator<String> it = statements.getValue().iterator();
-    assertTrue(it.next().contains("CREATE TABLE \""));
-    assertTrue(it.next().contains("CREATE INDEX \""));
-    assertTrue(it.next().contains("CREATE INDEX \""));
+    assertTrue(it.next().contains("CREATE TABLE IF NOT EXISTS \""));
+    assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS \""));
+    assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS \""));
   }
 
   @Test
@@ -133,7 +139,8 @@ public class PartitionMetadataAdminDaoTest {
   @Test
   public void testDeletePartitionMetadataTable() throws Exception {
     when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
-    partitionMetadataAdminDao.deletePartitionMetadataTable();
+    partitionMetadataAdminDao.deletePartitionMetadataTable(
+        Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
     verify(databaseAdminClient, times(1))
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
     assertEquals(3, ((Collection<?>) statements.getValue()).size());
@@ -143,10 +150,22 @@ public class PartitionMetadataAdminDaoTest {
     assertTrue(it.next().contains("DROP TABLE"));
   }
 
+  @Test
+  public void testDeletePartitionMetadataTableWithNoIndexes() throws Exception 
{
+    when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
+    
partitionMetadataAdminDao.deletePartitionMetadataTable(Collections.emptyList());
+    verify(databaseAdminClient, times(1))
+        .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
+    assertEquals(1, ((Collection<?>) statements.getValue()).size());
+    Iterator<String> it = statements.getValue().iterator();
+    assertTrue(it.next().contains("DROP TABLE"));
+  }
+
   @Test
   public void testDeletePartitionMetadataTablePostgres() throws Exception {
     when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
-    partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable();
+    partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(
+        Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
     verify(databaseAdminClient, times(1))
         .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
     assertEquals(3, ((Collection<?>) statements.getValue()).size());
@@ -156,11 +175,23 @@ public class PartitionMetadataAdminDaoTest {
     assertTrue(it.next().contains("DROP TABLE \""));
   }
 
+  @Test
+  public void testDeletePartitionMetadataTablePostgresWithNoIndexes() throws 
Exception {
+    when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null);
+    
partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(Collections.emptyList());
+    verify(databaseAdminClient, times(1))
+        .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), 
statements.capture(), isNull());
+    assertEquals(1, ((Collection<?>) statements.getValue()).size());
+    Iterator<String> it = statements.getValue().iterator();
+    assertTrue(it.next().contains("DROP TABLE \""));
+  }
+
   @Test
   public void testDeletePartitionMetadataTableWithTimeoutException() throws 
Exception {
     when(op.get(10, TimeUnit.MINUTES)).thenThrow(new 
TimeoutException(TIMED_OUT));
     try {
-      partitionMetadataAdminDao.deletePartitionMetadataTable();
+      partitionMetadataAdminDao.deletePartitionMetadataTable(
+          Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
       fail();
     } catch (SpannerException e) {
       assertTrue(e.getMessage().contains(TIMED_OUT));
@@ -171,7 +202,8 @@ public class PartitionMetadataAdminDaoTest {
   public void testDeletePartitionMetadataTableWithInterruptedException() 
throws Exception {
     when(op.get(10, TimeUnit.MINUTES)).thenThrow(new 
InterruptedException(INTERRUPTED));
     try {
-      partitionMetadataAdminDao.deletePartitionMetadataTable();
+      partitionMetadataAdminDao.deletePartitionMetadataTable(
+          Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME));
       fail();
     } catch (SpannerException e) {
       assertEquals(ErrorCode.CANCELLED, e.getErrorCode());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java
new file mode 100644
index 00000000000..2aae5b26a2c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class PartitionMetadataTableNamesTest {
+  @Test
+  public void testGeneratePartitionMetadataNamesRemovesHyphens() {
+    String databaseId = "my-database-id-12345";
+
+    PartitionMetadataTableNames names1 = 
PartitionMetadataTableNames.generateRandom(databaseId);
+    assertFalse(names1.getTableName().contains("-"));
+    assertFalse(names1.getWatermarkIndexName().contains("-"));
+    assertFalse(names1.getCreatedAtIndexName().contains("-"));
+
+    PartitionMetadataTableNames names2 = 
PartitionMetadataTableNames.generateRandom(databaseId);
+    assertNotEquals(names1.getTableName(), names2.getTableName());
+    assertNotEquals(names1.getWatermarkIndexName(), 
names2.getWatermarkIndexName());
+    assertNotEquals(names1.getCreatedAtIndexName(), 
names2.getCreatedAtIndexName());
+  }
+
+  @Test
+  public void testGeneratePartitionMetadataNamesIsShorterThan64Characters() {
+    PartitionMetadataTableNames names =
+        PartitionMetadataTableNames.generateRandom(
+            
"my-database-id-larger-than-maximum-length-1234567890-1234567890-1234567890");
+    assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH);
+    assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH);
+    assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH);
+
+    names = PartitionMetadataTableNames.generateRandom("d");
+    assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH);
+    assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH);
+    assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH);
+  }
+
+  @Test
+  public void testPartitionMetadataNamesFromExistingTable() {
+    PartitionMetadataTableNames names1 =
+        PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable");
+    assertEquals("mytable", names1.getTableName());
+    assertFalse(names1.getWatermarkIndexName().contains("-"));
+    assertFalse(names1.getCreatedAtIndexName().contains("-"));
+
+    PartitionMetadataTableNames names2 =
+        PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable");
+    assertEquals("mytable", names2.getTableName());
+    assertNotEquals(names1.getWatermarkIndexName(), 
names2.getWatermarkIndexName());
+    assertNotEquals(names1.getCreatedAtIndexName(), 
names2.getCreatedAtIndexName());
+  }
+}


Reply via email to