This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new ab554be189 PHOENIX-7718 : CDC Stream improvements (#2308) (#2311)
ab554be189 is described below
commit ab554be189e0a6a6506f50bbf5308f08c52b13fd
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Oct 30 15:53:05 2025 -0700
PHOENIX-7718 : CDC Stream improvements (#2308) (#2311)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../phoenix/query/ConnectionQueryServices.java | 3 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 25 ++++++
.../query/ConnectionlessQueryServicesImpl.java | 7 ++
.../query/DelegateConnectionQueryServices.java | 7 ++
.../org/apache/phoenix/schema/MetaDataClient.java | 38 +++------
.../main/java/org/apache/phoenix/util/CDCUtil.java | 2 +-
.../tasks/CdcStreamPartitionMetadataTask.java | 3 +
.../apache/phoenix/end2end/CDCDefinitionIT.java | 99 ++++++++++++++++++++--
.../org/apache/phoenix/end2end/CDCStreamIT.java | 28 ++++++
.../org/apache/phoenix/end2end/TableTTLIT.java | 2 +-
.../phoenix/query/MetaDataCacheMetricsIT.java | 4 +-
11 files changed, 182 insertions(+), 36 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index fe38911558..45340d88d5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.query;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@@ -314,4 +315,6 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
void
invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest>
requests)
throws Throwable;
+
+ void deleteAllStreamMetadataForTable(Connection conn, String tableName)
throws SQLException;
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 16d75d8b54..311a16e49f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -56,6 +56,8 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_COLUMN_NAME_BYTES;
@@ -6909,6 +6911,29 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices
}
}
+ /*
+ * Delete any metadata related to this table in the System tables for
Streams.
+ */
+ @Override
+ public void deleteAllStreamMetadataForTable(java.sql.Connection conn, String
tableName)
+ throws SQLException {
+ String deleteStreamStatusQuery =
+ "DELETE FROM " + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ?";
+ String deleteStreamPartitionsQuery =
+ "DELETE FROM " + SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ?";
+ LOGGER.info("Deleting Stream Metadata for table {}", tableName);
+ try (PreparedStatement ps =
conn.prepareStatement(deleteStreamStatusQuery)) {
+ ps.setString(1, tableName);
+ ps.executeUpdate();
+ conn.commit();
+ }
+ try (PreparedStatement ps =
conn.prepareStatement(deleteStreamPartitionsQuery)) {
+ ps.setString(1, tableName);
+ ps.executeUpdate();
+ conn.commit();
+ }
+ }
+
/**
* Invalidate metadata cache on all regionservers with retries for the given
list of
* InvalidateServerMetadataCacheRequest. Each
InvalidateServerMetadataCacheRequest contains
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 5ca4c1592d..ad5df1e39e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
@@ -899,4 +900,10 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices
throws Throwable {
// No-op
}
+
+ @Override
+ public void deleteAllStreamMetadataForTable(Connection conn, String
tableName)
+ throws SQLException {
+ // No-op
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 622c158bff..fdb78faa00 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.query;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@@ -469,4 +470,10 @@ public class DelegateConnectionQueryServices extends
DelegateQueryServices
throws Throwable {
getDelegate().invalidateServerMetadataCache(requests);
}
+
+ @Override
+ public void deleteAllStreamMetadataForTable(Connection conn, String
tableName)
+ throws SQLException {
+ getDelegate().deleteAllStreamMetadataForTable(conn, tableName);
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index fdaeed259d..f5be406566 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -90,7 +90,6 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NA
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYNC_INDEX_CREATED_DATE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAMESPACE_BYTES;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
@@ -4063,10 +4062,12 @@ public class MetaDataClient {
String parentTableName = statement.getTableName().getTableName();
String indexName =
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
// Mark CDC Stream as Disabled
- long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
- String streamName = String.format(CDC_STREAM_NAME_FORMAT, parentTableName,
cdcTableName,
+ long cdcIndexTimestamp =
+ connection.getTable(SchemaUtil.getTableName(schemaName,
indexName)).getTimeStamp();
+ String fullParentTableName = SchemaUtil.getTableName(schemaName,
parentTableName);
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT,
fullParentTableName, cdcTableName,
cdcIndexTimestamp, CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
- markCDCStreamStatus(parentTableName, streamName,
CDCUtil.CdcStreamStatus.DISABLED);
+ markCDCStreamStatus(fullParentTableName, streamName,
CDCUtil.CdcStreamStatus.DISABLED);
// Dropping the virtual CDC Table
dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC,
statement.ifExists(),
false, false);
@@ -4081,24 +4082,6 @@ public class MetaDataClient {
}
}
- private void deleteAllStreamMetadataForTable(String tableName) throws
SQLException {
- String deleteStreamStatusQuery =
- "DELETE FROM " + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ?";
- String deleteStreamPartitionsQuery =
- "DELETE FROM " + SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ?";
- LOGGER.info("Deleting Stream Metadata for table {}", tableName);
- try (PreparedStatement ps =
connection.prepareStatement(deleteStreamStatusQuery)) {
- ps.setString(1, tableName);
- ps.executeUpdate();
- connection.commit();
- }
- try (PreparedStatement ps =
connection.prepareStatement(deleteStreamPartitionsQuery)) {
- ps.setString(1, tableName);
- ps.executeUpdate();
- connection.commit();
- }
- }
-
private void markCDCStreamStatus(String tableName, String streamName,
CDCUtil.CdcStreamStatus status) throws SQLException {
String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME +
" VALUES (?, ?, ?)";
@@ -4161,8 +4144,9 @@ public class MetaDataClient {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
try {
PTable ptable = connection.getTable(fullTableName);
- if (PTableType.TABLE.equals(ptable.getType()) &&
CDCUtil.hasCDCIndex(ptable)) {
- deleteAllStreamMetadataForTable(fullTableName);
+ if (PTableType.TABLE.equals(ptable.getType())) {
+ connection.unwrap(PhoenixConnection.class).getQueryServices()
+ .deleteAllStreamMetadataForTable(connection, fullTableName);
}
if (
parentTableName != null &&
!parentTableName.equals(ptable.getParentTableName().getString())
@@ -4745,9 +4729,9 @@ public class MetaDataClient {
/**
* To check if TTL is defined at any of the child below we are
checking it at
* {@link
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List,
ColumnMutator, int, PTable, PTable, boolean)}
- * level where in function
- * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[],
byte[], byte[], List, int)}
- * we are already traversing through allDescendantViews.
+ * level where in function {@link
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
+ * validateIfMutationAllowedOnParent(PTable, List, PTableType, long,
byte[], byte[],
+ * byte[], List, int)} we are already traversing through
allDescendantViews.
*/
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index f25655ad76..68c9b48aa7 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -89,7 +89,7 @@ public class CDCUtil {
}
public static String getCDCIndexName(String cdcName) {
- return CDC_INDEX_PREFIX + SchemaUtil.getTableNameFromFullName(cdcName);
+ return CDC_INDEX_PREFIX + cdcName;
}
public static boolean isCDCIndex(String indexName) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
index 36295c6c74..9179588e22 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -70,6 +71,8 @@ public class CdcStreamPartitionMetadataTask extends BaseTask {
try {
pconn =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ // clear table region cache to avoid stale data
+
pconn.getQueryServices().clearTableRegionCache(TableName.valueOf(tableName));
List<HRegionLocation> tableRegions =
pconn.getQueryServices().getAllTableRegions(tableName.getBytes(),
getTableRegionsTimeout);
upsertPartitionMetadata(pconn, tableName, streamName, tableRegions);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index a0ca8c26e3..0aee3c9ab1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -31,6 +32,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.CDCUtil;
@@ -258,30 +260,115 @@ public class CDCDefinitionIT extends CDCBaseIT {
assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS);
}
+ @Test
+ public void testIndexNameAfterCreateCDC() throws Exception {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String schemaName = generateUniqueName();
+ String tableName =
+ (generateUniqueName() + "." + generateUniqueName() + "." +
generateUniqueName())
+ .toLowerCase();
+ String fullTableName =
+ SchemaUtil.getTableName(schemaName,
SchemaUtil.getEscapedArgument(tableName));
+ String viewName =
+ (generateUniqueName() + "." + generateUniqueName() + "." +
generateUniqueName())
+ .toLowerCase();
+ String cdcName = "CDC_" + tableName;
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + " ( k
INTEGER PRIMARY KEY,"
+ + " v1 INTEGER," + " v2 DATE)");
+ if (forView) {
+ String fullViewName =
+ SchemaUtil.getTableName(schemaName,
SchemaUtil.getEscapedArgument(viewName));
+ conn.createStatement()
+ .execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " +
fullTableName);
+ fullTableName = fullViewName;
+ }
+ conn.createStatement().execute("CREATE CDC \"" + cdcName + "\" ON " +
fullTableName);
+ PTable ptable = conn.unwrap(PhoenixConnection.class)
+ .getTableNoCache(SchemaUtil.getTableName(schemaName, forView ? viewName
: tableName));
+ for (PTable index : ptable.getIndexes()) {
+ if (CDCUtil.isCDCIndex(index.getTableName().getString())) {
+ assertEquals(CDCUtil.getCDCIndexName(cdcName),
index.getTableName().getString());
+ assertFalse(index.getTableName().getString().contains(schemaName));
+ break;
+ }
+ }
+ }
+
@Test
public void testDropCDC() throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1
INTEGER," + " v2 DATE)");
+ String schemaName = null;
+ String viewName = forView ? generateUniqueName() : null;
String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+ }
+
+ @Test
+ public void testDropCDCWithSchema() throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String schemaName = generateUniqueName();
+ String viewName = forView ? generateUniqueName() : null;
+ String cdcName = generateUniqueName();
+ testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+ }
+
+ @Test
+ public void testDropCDCWithAllCaseSensitiveNames() throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName =
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+ String schemaName =
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+ String viewName =
+ forView ?
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase()) : null;
+ String cdcName =
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+ testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+ }
+
+ @Test
+ public void testDropCDCWithCaseSensitiveTableName() throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName =
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+ String schemaName = generateUniqueName();
+ String viewName =
+ forView ?
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase()) : null;
+ String cdcName =
SchemaUtil.getEscapedArgument(generateUniqueName().toLowerCase());
+ testDropCDCHelper(conn, schemaName, tableName, viewName, cdcName);
+ }
+
+ private void testDropCDCHelper(Connection conn, String schemaName, String
tableName,
+ String viewName, String cdcName) throws SQLException {
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + " ( k
INTEGER PRIMARY KEY,"
+ + " v1 INTEGER," + " v2 DATE) TTL=100");
+ if (viewName != null) {
+ String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+ conn.createStatement()
+ .execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " +
fullTableName);
+ fullTableName = fullViewName;
+ }
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + fullTableName;
conn.createStatement().execute(cdc_sql);
- String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + fullTableName;
conn.createStatement().execute(drop_cdc_sql);
+ cdcName = SchemaUtil.getUnEscapedFullName(cdcName);
try (ResultSet rs = conn.createStatement()
.executeQuery("SELECT cdc_include FROM " + "system.catalog WHERE
table_name = '" + cdcName
+ "' AND column_name IS NULL and column_family IS NULL")) {
- assertEquals(false, rs.next());
+ assertFalse(rs.next());
}
try (ResultSet rs = conn.createStatement()
.executeQuery("SELECT index_type FROM " + "system.catalog WHERE
table_name = '"
+ CDCUtil.getCDCIndexName(cdcName)
+ "' AND column_name IS NULL and column_family IS NULL")) {
- assertEquals(false, rs.next());
+ assertFalse(rs.next());
}
try {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 964e489d6c..cb805bfccc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -433,6 +433,34 @@ public class CDCStreamIT extends CDCBaseIT {
Assert.assertTrue(rs.next());
}
+ @Test
+ public void testStreamMetadataWhenCDCIsDropped() throws SQLException {
+ Connection conn = newConnection();
+ MetaDataClient mdc = new
MetaDataClient(conn.unwrap(PhoenixConnection.class));
+ String schemaName = "\"" + generateUniqueName().toLowerCase() + "\"";
+ String tableName =
+ SchemaUtil.getTableName(schemaName, "\"" +
generateUniqueName().toLowerCase() + "\"");
+ String unescapedFullTableName = SchemaUtil.getUnEscapedFullName(tableName);
+ String create_table_sql =
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1
INTEGER, v2 DATE)";
+ conn.createStatement().execute(create_table_sql);
+ String cdcName = "\"" + generateUniqueName().toLowerCase() + "\"";
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
+ TaskRegionObserver.SelfHealingTask task = new
TaskRegionObserver.SelfHealingTask(
+ taskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ task.run();
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+ // check if stream metadata is cleared when cdc is dropped
+ conn.createStatement().execute(drop_cdc_sql);
+ Assert.assertNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+ // should be able to re-create cdc with same name and metadata should be
populated
+ conn.createStatement().execute(cdc_sql);
+
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+ task.run();
+ }
+
@Test
public void testCDCStreamTTL() throws Exception {
Connection conn = newConnection();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index b7f481d118..dcefcb4866 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -656,7 +656,7 @@ public class TableTTLIT extends BaseTest {
conn.createStatement().execute(cdcSql);
conn.commit();
- String cdcIndexName = schemaName + "." +
CDCUtil.getCDCIndexName(schemaName + "." + cdcName);
+ String cdcIndexName = schemaName + "." +
CDCUtil.getCDCIndexName(cdcName);
String cdcFullName = SchemaUtil.getTableName(null, schemaName + "." +
cdcName);
PTable cdcIndex = ((PhoenixConnection)
conn).getTableNoCache(cdcIndexName);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
index c1988f7ab8..95cb5a0245 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/query/MetaDataCacheMetricsIT.java
@@ -69,7 +69,9 @@ public class MetaDataCacheMetricsIT extends MetaDataCachingIT
{
currEstimatedUsedCacheSize =
GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_ESTIMATED_USED_SIZE.getMetric().getValue();
- assertEquals("Incorrect number of client metadata cache removals",
prevCacheRemovalCount + 1,
+ // there are some client cache updates for SYSTEM.CDC_STREAM when we
clear stream metadata as
+ // part of drop table
+ assertEquals("Incorrect number of client metadata cache removals",
prevCacheRemovalCount + 3,
GlobalClientMetrics.GLOBAL_CLIENT_METADATA_CACHE_REMOVAL_COUNTER.getMetric().getValue());
assertTrue(
String.format(