This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 22b49d05b1 PHOENIX-7718 : CDC Stream improvements (#2308)
22b49d05b1 is described below

commit 22b49d05b1db977aa75669570450de3646a13e46
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Oct 30 15:45:36 2025 -0700

    PHOENIX-7718 : CDC Stream improvements (#2308)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../phoenix/query/ConnectionQueryServices.java     |  3 +
 .../phoenix/query/ConnectionQueryServicesImpl.java | 25 ++++++
 .../query/ConnectionlessQueryServicesImpl.java     |  7 ++
 .../query/DelegateConnectionQueryServices.java     |  7 ++
 .../org/apache/phoenix/schema/MetaDataClient.java  | 38 +++------
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  2 +-
 .../tasks/CdcStreamPartitionMetadataTask.java      |  3 +
 .../apache/phoenix/end2end/CDCDefinitionIT.java    | 99 ++++++++++++++++++++--
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 28 ++++++
 .../org/apache/phoenix/end2end/TableTTLIT.java     |  2 +-
 .../phoenix/query/MetaDataCacheMetricsIT.java      |  4 +-
 11 files changed, 182 insertions(+), 36 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index fe38911558..45340d88d5 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
@@ -314,4 +315,6 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
 
   void 
invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> 
requests)
     throws Throwable;
+
+  void deleteAllStreamMetadataForTable(Connection conn, String tableName) 
throws SQLException;
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 16d75d8b54..311a16e49f 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -56,6 +56,8 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_COLUMN_NAME_BYTES;
@@ -6909,6 +6911,29 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices
     }
   }
 
+  /*
+   * Delete any metadata related to this table in the System tables for 
Streams.
+   */
+  @Override
+  public void deleteAllStreamMetadataForTable(java.sql.Connection conn, String 
tableName)
+    throws SQLException {
+    String deleteStreamStatusQuery =
+      "DELETE FROM " + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ?";
+    String deleteStreamPartitionsQuery =
+      "DELETE FROM " + SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ?";
+    LOGGER.info("Deleting Stream Metadata for table {}", tableName);
+    try (PreparedStatement ps = 
conn.prepareStatement(deleteStreamStatusQuery)) {
+      ps.setString(1, tableName);
+      ps.executeUpdate();
+      conn.commit();
+    }
+    try (PreparedStatement ps = 
conn.prepareStatement(deleteStreamPartitionsQuery)) {
+      ps.setString(1, tableName);
+      ps.executeUpdate();
+      conn.commit();
+    }
+  }
+
   /**
    * Invalidate metadata cache on all regionservers with retries for the given 
list of
    * InvalidateServerMetadataCacheRequest. Each 
InvalidateServerMetadataCacheRequest contains
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 5ca4c1592d..ad5df1e39e 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -899,4 +900,10 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices
     throws Throwable {
     // No-op
   }
+
+  @Override
+  public void deleteAllStreamMetadataForTable(Connection conn, String 
tableName)
+    throws SQLException {
+    // No-op
+  }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 622c158bff..fdb78faa00 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
@@ -469,4 +470,10 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices
     throws Throwable {
     getDelegate().invalidateServerMetadataCache(requests);
   }
+
+  @Override
+  public void deleteAllStreamMetadataForTable(Connection conn, String 
tableName)
+    throws SQLException {
+    getDelegate().deleteAllStreamMetadataForTable(conn, tableName);
+  }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index fdaeed259d..f5be406566 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -90,7 +90,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NA
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYNC_INDEX_CREATED_DATE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAMESPACE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
@@ -4063,10 +4062,12 @@ public class MetaDataClient {
     String parentTableName = statement.getTableName().getTableName();
     String indexName = 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
     // Mark CDC Stream as Disabled
-    long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
-    String streamName = String.format(CDC_STREAM_NAME_FORMAT, parentTableName, 
cdcTableName,
+    long cdcIndexTimestamp =
+      connection.getTable(SchemaUtil.getTableName(schemaName, 
indexName)).getTimeStamp();
+    String fullParentTableName = SchemaUtil.getTableName(schemaName, 
parentTableName);
+    String streamName = String.format(CDC_STREAM_NAME_FORMAT, 
fullParentTableName, cdcTableName,
       cdcIndexTimestamp, CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
-    markCDCStreamStatus(parentTableName, streamName, 
CDCUtil.CdcStreamStatus.DISABLED);
+    markCDCStreamStatus(fullParentTableName, streamName, 
CDCUtil.CdcStreamStatus.DISABLED);
     // Dropping the virtual CDC Table
     dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, 
statement.ifExists(),
       false, false);
@@ -4081,24 +4082,6 @@ public class MetaDataClient {
     }
   }
 
-  private void deleteAllStreamMetadataForTable(String tableName) throws 
SQLException {
-    String deleteStreamStatusQuery =
-      "DELETE FROM " + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ?";
-    String deleteStreamPartitionsQuery =
-      "DELETE FROM " + SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ?";
-    LOGGER.info("Deleting Stream Metadata for table {}", tableName);
-    try (PreparedStatement ps = 
connection.prepareStatement(deleteStreamStatusQuery)) {
-      ps.setString(1, tableName);
-      ps.executeUpdate();
-      connection.commit();
-    }
-    try (PreparedStatement ps = 
connection.prepareStatement(deleteStreamPartitionsQuery)) {
-      ps.setString(1, tableName);
-      ps.executeUpdate();
-      connection.commit();
-    }
-  }
-
   private void markCDCStreamStatus(String tableName, String streamName,
     CDCUtil.CdcStreamStatus status) throws SQLException {
     String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + 
" VALUES (?, ?, ?)";
@@ -4161,8 +4144,9 @@ public class MetaDataClient {
     String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
     try {
       PTable ptable = connection.getTable(fullTableName);
-      if (PTableType.TABLE.equals(ptable.getType()) && 
CDCUtil.hasCDCIndex(ptable)) {
-        deleteAllStreamMetadataForTable(fullTableName);
+      if (PTableType.TABLE.equals(ptable.getType())) {
+        connection.unwrap(PhoenixConnection.class).getQueryServices()
+          .deleteAllStreamMetadataForTable(connection, fullTableName);
       }
       if (
         parentTableName != null && 
!parentTableName.equals(ptable.getParentTableName().getString())
@@ -4745,9 +4729,9 @@ public class MetaDataClient {
           /**
            * To check if TTL is defined at any of the child below we are 
checking it at
            * {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, 
ColumnMutator, int, PTable, PTable, boolean)}
-           * level where in function
-           * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# 
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], 
byte[], byte[], List, int)}
-           * we are already traversing through allDescendantViews.
+           * level where in function {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
+           * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, 
byte[], byte[],
+           * byte[], List, int)} we are already traversing through 
allDescendantViews.
            */
         }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index f25655ad76..68c9b48aa7 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -89,7 +89,7 @@ public class CDCUtil {
   }
 
   public static String getCDCIndexName(String cdcName) {
-    return CDC_INDEX_PREFIX + SchemaUtil.getTableNameFromFullName(cdcName);
+    return CDC_INDEX_PREFIX + cdcName;
   }
 
   public static boolean isCDCIndex(String indexName) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
index 36295c6c74..9179588e22 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
@@ -31,6 +31,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -70,6 +71,8 @@ public class CdcStreamPartitionMetadataTask extends BaseTask {
     try {
       pconn =
         
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+      // clear table region cache to avoid stale data
+      
pconn.getQueryServices().clearTableRegionCache(TableName.valueOf(tableName));
       List<HRegionLocation> tableRegions =
         pconn.getQueryServices().getAllTableRegions(tableName.getBytes(), 
getTableRegionsTimeout);
       upsertPartitionMetadata(pconn, tableName, streamName, tableRegions);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index a0ca8c26e3..0aee3c9ab1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -31,6 +32,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.CDCUtil;
@@ -258,30 +260,115 @@ public class CDCDefinitionIT extends CDCBaseIT {
     assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS);
   }
 
+  @Test
+  public void testIndexNameAfterCreateCDC() throws Exception {
+    Properties props = new Properties();
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    String schemaName = generateUniqueName();
+    String tableName =
+      (generateUniqueName() + "." + generateUniqueName() + "." + 
generateUniqueName())
+        .toLowerCase();
+    String fullTableName =
+      SchemaUtil.getTableName(schemaName, 
SchemaUtil.getEscapedArgument(tableName));
+    String viewName =
+      (generateUniqueName() + "." + generateUniqueName() + "." + 
generateUniqueName())
+        .toLowerCase();
+    String cdcName = "CDC_" + tableName;
+    conn.createStatement().execute("CREATE TABLE  " + fullTableName + " ( k 
INTEGER PRIMARY KEY,"
+      + " v1 INTEGER," + " v2 DATE)");
+    if (forView) {
+      String fullViewName =
+        SchemaUtil.getTableName(schemaName, 
SchemaUtil.getEscapedArgument(viewName));
+      conn.createStatement()
+        .execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + 
fullTableName);
+      fullTableName = fullViewName;
+    }
+    conn.createStatement().execute("CREATE CDC \"" + cdcName + "\" ON " + 
fullTableName);
+    PTable ptable = conn.unwrap(PhoenixConnection.class)
+      .getTableNoCache(SchemaUtil.getTableName(schemaName, forView ? viewName 
: tableName));
+    for (PTable index : ptable.getIndexes()) {
+      if (CDCUtil.isCDCIndex(index.getTableName().getString())) {
+        assertEquals(CDCUtil.getCDCIndexName(cdcName), 
index.getTableName().getString());
+        assertFalse(index.getTableName().getString().contains(schemaName));
+        break;
+      }
+    }
+  }
+
   @Test
   public void testDropCDC() throws SQLException {
     Properties props = new Properties();
     Connection conn = DriverManager.getConnection(getUrl(), props);
     String tableName = generateUniqueName();
-    conn.createStatement().execute(
-      "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 
INTEGER," + " v2 DATE)");
+    String schemaName = null;
+    String viewName = forView ? generateUniqueName() : null;
     String cdcName = generateUniqueName();
-    String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+    testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+  }
+
+  @Test
+  public void testDropCDCWithSchema() throws SQLException {
+    Properties props = new Properties();
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    String tableName = generateUniqueName();
+    String schemaName = generateUniqueName();
+    String viewName = forView ? generateUniqueName() : null;
+    String cdcName = generateUniqueName();
+    testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+  }
+
+  @Test
+  public void testDropCDCWithAllCaseSensitiveNames() throws SQLException {
+    Properties props = new Properties();
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    String tableName = 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+    String schemaName = 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+    String viewName =
+      forView ? 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase()) : null;
+    String cdcName = 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+    testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+  }
+
+  @Test
+  public void testDropCDCWithCaseSensitiveTableName() throws SQLException {
+    Properties props = new Properties();
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    String tableName = 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+    String schemaName = generateUniqueName();
+    String viewName =
+      forView ? 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase()) : null;
+    String cdcName = 
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+    testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+  }
+
+  private void testDropCDCHelper(Connection conn, String schemaName, String 
tableName,
+    String viewName, String cdcName) throws SQLException {
+    String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+    conn.createStatement().execute("CREATE TABLE  " + fullTableName + " ( k 
INTEGER PRIMARY KEY,"
+      + " v1 INTEGER," + " v2 DATE) TTL=100");
+    if (viewName != null) {
+      String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+      conn.createStatement()
+        .execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + 
fullTableName);
+      fullTableName = fullViewName;
+    }
+    String cdc_sql = "CREATE CDC " + cdcName + " ON " + fullTableName;
     conn.createStatement().execute(cdc_sql);
 
-    String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+    String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + fullTableName;
     conn.createStatement().execute(drop_cdc_sql);
 
+    cdcName = SchemaUtil.getUnEscapedFullName(cdcName);
     try (ResultSet rs = conn.createStatement()
       .executeQuery("SELECT cdc_include FROM " + "system.catalog WHERE 
table_name = '" + cdcName
         + "' AND column_name IS NULL and column_family IS NULL")) {
-      assertEquals(false, rs.next());
+      assertFalse(rs.next());
     }
     try (ResultSet rs = conn.createStatement()
       .executeQuery("SELECT index_type FROM " + "system.catalog WHERE 
table_name = '"
         + CDCUtil.getCDCIndexName(cdcName)
         + "' AND column_name IS NULL and column_family IS NULL")) {
-      assertEquals(false, rs.next());
+      assertFalse(rs.next());
     }
 
     try {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 964e489d6c..cb805bfccc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -433,6 +433,34 @@ public class CDCStreamIT extends CDCBaseIT {
     Assert.assertTrue(rs.next());
   }
 
+  @Test
+  public void testStreamMetadataWhenCDCIsDropped() throws SQLException {
+    Connection conn = newConnection();
+    MetaDataClient mdc = new 
MetaDataClient(conn.unwrap(PhoenixConnection.class));
+    String schemaName = "\"" + generateUniqueName().toLowerCase() + "\"";
+    String tableName =
+      SchemaUtil.getTableName(schemaName, "\"" + 
generateUniqueName().toLowerCase() + "\"");
+    String unescapedFullTableName = SchemaUtil.getUnEscapedFullName(tableName);
+    String create_table_sql =
+      "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 
INTEGER, v2 DATE)";
+    conn.createStatement().execute(create_table_sql);
+    String cdcName = "\"" + generateUniqueName().toLowerCase() + "\"";
+    String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+    conn.createStatement().execute(cdc_sql);
+    TaskRegionObserver.SelfHealingTask task = new 
TaskRegionObserver.SelfHealingTask(
+      taskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+    task.run();
+    String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+    
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+    // check if stream metadata is cleared when cdc is dropped
+    conn.createStatement().execute(drop_cdc_sql);
+    Assert.assertNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+    // should be able to re-create cdc with same name and metadata should be 
populated
+    conn.createStatement().execute(cdc_sql);
+    
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+    task.run();
+  }
+
   @Test
   public void testCDCStreamTTL() throws Exception {
     Connection conn = newConnection();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index b7f481d118..dcefcb4866 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -656,7 +656,7 @@ public class TableTTLIT extends BaseTest {
       conn.createStatement().execute(cdcSql);
       conn.commit();
 
-      String cdcIndexName = schemaName + "." + 
CDCUtil.getCDCIndexName(schemaName + "." + cdcName);
+      String cdcIndexName = schemaName + "." + 
CDCUtil.getCDCIndexName(cdcName);
       String cdcFullName = SchemaUtil.getTableName(null, schemaName + "." + 
cdcName);
 
       PTable cdcIndex = ((PhoenixConnection) 
conn).getTableNoCache(cdcIndexName);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
index c1988f7ab8..95cb5a0245 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
@@ -69,7 +69,9 @@ public class MetaDataCacheMetricsIT extends MetaDataCachingIT 
{
 
       currEstimatedUsedCacheSize =
         
GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_ESTIMATED_USED_SIZE.getMetric().getValue();
-      assertEquals("Incorrect number of client metadata cache removals", 
prevCacheRemovalCount + 1,
+      // there are some client cache updates for SYSTEM.CDC_STREAM when we 
clear stream metadata as
+      // part of drop table
+      assertEquals("Incorrect number of client metadata cache removals", 
prevCacheRemovalCount + 3,
         
GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_REMOVAL_COUNTER.getMetric().getValue());
       assertTrue(
         String.format(

Reply via email to