This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 90d3e16c68 PHOENIX-7644 : CDC Stream improvements (#2196)
90d3e16c68 is described below

commit 90d3e16c687617b05f0a789afb8fd278ca51416e
Author: Palash Chauhan <palashc...@gmail.com>
AuthorDate: Fri Jun 20 10:26:24 2025 -0700

    PHOENIX-7644 : CDC Stream improvements (#2196)
    
    Co-authored-by: Palash Chauhan 
<p.chau...@pchauha-ltmgv47.internal.salesforce.com>
---
 .../org/apache/phoenix/query/QueryConstants.java   |   6 +-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  11 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   5 +
 .../phoenix/coprocessor/PhoenixMasterObserver.java | 155 +++++++++++----------
 .../apache/phoenix/end2end/CDCDefinitionIT.java    |  67 ++++++++-
 5 files changed, 164 insertions(+), 80 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 02c27096a0..9911f5f0a3 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -663,7 +663,8 @@ public interface QueryConstants {
             TABLE_NAME + "," + STREAM_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
-            TRANSACTIONAL + "=" + Boolean.FALSE;
+            TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
+            UPDATE_CACHE_FREQUENCY + "=" + "7200000";
 
     String CREATE_CDC_STREAM_METADATA = "CREATE TABLE " + 
SYSTEM_CATALOG_SCHEMA + ".\"" +
             SYSTEM_CDC_STREAM_TABLE + "\"(\n" +
@@ -682,5 +683,6 @@ public interface QueryConstants {
             TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "," + 
PARENT_PARTITION_ID + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
-            TRANSACTIONAL + "=" + Boolean.FALSE;
+            TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
+            UPDATE_CACHE_FREQUENCY + "=" + "7200000";
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 3117c5cebc..b728273cf0 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Date;
 import java.util.HashSet;
@@ -39,7 +40,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PVarchar;
 import org.bson.RawBsonDocument;
 
 public class CDCUtil {
@@ -99,6 +99,15 @@ public class CDCUtil {
         return indexName.startsWith(CDC_INDEX_PREFIX);
     }
 
+    public static boolean isCDCIndex(byte[] indexNameBytes) {
+        String indexName = Bytes.toString(indexNameBytes);
+        return isCDCIndex(indexName);
+    }
+
+    public static byte[] getCdcObjectName(byte[] cdcIndexName) {
+        return Arrays.copyOfRange(cdcIndexName, CDC_INDEX_PREFIX.length(), 
cdcIndexName.length);
+    }
+
     public static boolean isCDCIndex(PTable indexTable) {
         return isCDCIndex(indexTable.getTableName().getString());
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9165bd278c..6253b66eb1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3333,6 +3333,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
         // Recursively delete indexes
         for (byte[] indexName : indexNames) {
+            if (CDCUtil.isCDCIndex(indexName)) {
+                byte[] cdcKey = SchemaUtil.getTableKey(tenantId, schemaName, 
CDCUtil.getCdcObjectName(indexName));
+                Delete deleteCdc = new Delete(cdcKey, clientTimeStamp);
+                catalogMutations.add(deleteCdc);
+            }
             byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, 
indexName);
             // FIXME: Remove when unintentionally deprecated method is fixed 
(HBASE-7870).
             // FIXME: the version of the Delete constructor without the lock 
args was introduced
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index ce41e98e85..e2bcf73642 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -276,33 +276,34 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
             qb.append(" AND PARTITION_END_KEY = ? ");
         }
 
-        PreparedStatement pstmt = conn.prepareStatement(qb.toString());
-        int index = 1;
-        pstmt.setString(index++, tableName);
-        pstmt.setString(index++, streamName);
-        if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey);
-        if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
-        LOGGER.info("Query to get parent partition id: " + pstmt);
-
         List<String> ancestorIDs = new ArrayList<>();
-        ResultSet rs = pstmt.executeQuery();
         List<Long> parentPartitionStartTimes = new ArrayList<>();
-        if (rs.next()) {
-            ancestorIDs.add(rs.getString(1));
-            ancestorIDs.add(rs.getString(2));
-            parentPartitionStartTimes.add(rs.getLong(3));
-        } else {
-            throw new ParentPartitionNotFound(
-                    String.format("Could not find parent of the provided 
daughters: "
-                                    + "startKeyA=%s endKeyA=%s startKeyB=%s 
endKeyB=%s",
-                    Bytes.toStringBinary(regionInfoA.getStartKey()),
-                    Bytes.toStringBinary(regionInfoA.getEndKey()),
-                    Bytes.toStringBinary(regionInfoB.getStartKey()),
-                    Bytes.toStringBinary(regionInfoB.getEndKey())));
-        }
-        // if parent was a result of a merge, there will be multiple 
grandparents.
-        while (rs.next()) {
-            ancestorIDs.add(rs.getString(2));
+        try (PreparedStatement pstmt = conn.prepareStatement(qb.toString())) {
+            int index = 1;
+            pstmt.setString(index++, tableName);
+            pstmt.setString(index++, streamName);
+            if (parentStartKey.length > 0) pstmt.setBytes(index++, 
parentStartKey);
+            if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
+            LOGGER.info("Query to get parent partition id: " + pstmt);
+
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                ancestorIDs.add(rs.getString(1));
+                ancestorIDs.add(rs.getString(2));
+                parentPartitionStartTimes.add(rs.getLong(3));
+            } else {
+                throw new ParentPartitionNotFound(
+                        String.format("Could not find parent of the provided 
daughters: "
+                                        + "startKeyA=%s endKeyA=%s 
startKeyB=%s endKeyB=%s",
+                                
Bytes.toStringBinary(regionInfoA.getStartKey()),
+                                Bytes.toStringBinary(regionInfoA.getEndKey()),
+                                
Bytes.toStringBinary(regionInfoB.getStartKey()),
+                                
Bytes.toStringBinary(regionInfoB.getEndKey())));
+            }
+            // if parent was a result of a merge, there will be multiple 
grandparents.
+            while (rs.next()) {
+                ancestorIDs.add(rs.getString(2));
+            }
         }
         return new Pair<>(ancestorIDs, parentPartitionStartTimes);
     }
@@ -316,21 +317,22 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                                                   RegionInfo parent) throws 
SQLException {
         List<String> ancestorIDs = new ArrayList<>();
         ancestorIDs.add(parent.getEncodedName());
-        PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
-        pstmt.setString(1, tableName);
-        pstmt.setString(2, streamName);
-        pstmt.setString(3, parent.getEncodedName());
-        ResultSet rs = pstmt.executeQuery();
-        if (rs.next()) {
-            ancestorIDs.add(rs.getString(1));
-        } else {
-            throw new ParentPartitionNotFound(String.format(
-                    "Could not find parent of the provided merged region: %s",
-                    parent.getEncodedName()));
-        }
-        // if parent was a result of a merge, there will be multiple 
grandparents.
-        while (rs.next()) {
-            ancestorIDs.add(rs.getString(1));
+        try (PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE)) {
+            pstmt.setString(1, tableName);
+            pstmt.setString(2, streamName);
+            pstmt.setString(3, parent.getEncodedName());
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                ancestorIDs.add(rs.getString(1));
+            } else {
+                throw new ParentPartitionNotFound(String.format(
+                        "Could not find parent of the provided merged region: 
%s",
+                        parent.getEncodedName()));
+            }
+            // if parent was a result of a merge, there will be multiple 
grandparents.
+            while (rs.next()) {
+                ancestorIDs.add(rs.getString(1));
+            }
         }
         return ancestorIDs;
     }
@@ -346,27 +348,28 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                                          List<Long> parentPartitionStartTimes)
             throws SQLException {
         conn.setAutoCommit(false);
-        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
-        for (RegionInfo daughter : daughters) {
-            for (int i=0; i<parentPartitionIDs.size(); i++) {
-                String partitionId = daughter.getEncodedName();
-                long startTime = daughter.getRegionId();
-                byte[] startKey = daughter.getStartKey();
-                byte[] endKey = daughter.getEndKey();
-                pstmt.setString(1, tableName);
-                pstmt.setString(2, streamName);
-                pstmt.setString(3, partitionId);
-                pstmt.setString(4, parentPartitionIDs.get(i));
-                pstmt.setLong(5, startTime);
-                // endTime in not set when inserting a new partition
-                pstmt.setNull(6, Types.BIGINT);
-                pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
-                pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
-                pstmt.setLong(9, parentPartitionStartTimes.get(i));
-                pstmt.executeUpdate();
+        try (PreparedStatement pstmt = 
conn.prepareStatement(PARTITION_UPSERT_SQL)) {
+            for (RegionInfo daughter : daughters) {
+                for (int i=0; i<parentPartitionIDs.size(); i++) {
+                    String partitionId = daughter.getEncodedName();
+                    long startTime = daughter.getRegionId();
+                    byte[] startKey = daughter.getStartKey();
+                    byte[] endKey = daughter.getEndKey();
+                    pstmt.setString(1, tableName);
+                    pstmt.setString(2, streamName);
+                    pstmt.setString(3, partitionId);
+                    pstmt.setString(4, parentPartitionIDs.get(i));
+                    pstmt.setLong(5, startTime);
+                    // endTime in not set when inserting a new partition
+                    pstmt.setNull(6, Types.BIGINT);
+                    pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+                    pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+                    pstmt.setLong(9, parentPartitionStartTimes.get(i));
+                    pstmt.executeUpdate();
+                }
             }
+            conn.commit();
         }
-        conn.commit();
     }
 
     /**
@@ -380,29 +383,31 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                                               long daughterStartTime) throws 
SQLException {
         conn.setAutoCommit(false);
         // ancestorIDs = [parentID, grandparentID1, grandparentID2...]
-        PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
-        for (int i=1; i<ancestorIDs.size(); i++) {
-            pstmt.setString(1, tableName);
-            pstmt.setString(2, streamName);
-            pstmt.setString(3, ancestorIDs.get(0));
-            pstmt.setString(4, ancestorIDs.get(i));
-            pstmt.setLong(5, daughterStartTime);
-            pstmt.executeUpdate();
+        try (PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL)) {
+            for (int i=1; i<ancestorIDs.size(); i++) {
+                pstmt.setString(1, tableName);
+                pstmt.setString(2, streamName);
+                pstmt.setString(3, ancestorIDs.get(0));
+                pstmt.setString(4, ancestorIDs.get(i));
+                pstmt.setLong(5, daughterStartTime);
+                pstmt.executeUpdate();
+            }
+            conn.commit();
         }
-        conn.commit();
     }
 
     /**
      * Get the stream name on the given table if one exists in ENABLED state.
      */
     private String getStreamName(Connection conn, String tableName) throws 
SQLException {
-        PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);
-        pstmt.setString(1, tableName);
-        ResultSet rs = pstmt.executeQuery();
-        if (rs.next()) {
-            return rs.getString(1);
-        } else {
-            return null;
+        try (PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY)) {
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                return rs.getString(1);
+            } else {
+                return null;
+            }
         }
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index bb3263a93f..42fb437656 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -129,7 +129,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
     }
 
     @Test
-    public void testCreateCaseSensitiveTable() throws Exception {
+    public void testCreateDropCaseSensitiveTable() throws Exception {
         Connection conn = newConnection();
         String tableName = "\"" + generateUniqueName().toLowerCase() + "\"";
         conn.createStatement().execute(
@@ -145,10 +145,21 @@ public class CDCDefinitionIT extends CDCBaseIT {
         String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
         conn.createStatement().execute(cdc_sql);
         conn.createStatement().executeQuery("SELECT * FROM " + cdcName);
+
+        String drop_sql = forView ? "DROP VIEW " + tableName : "DROP TABLE " + 
tableName;
+        conn.createStatement().execute(drop_sql);
+        String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+        try {
+            conn.createStatement().execute(drop_cdc_sql);
+            fail("Expected to fail as cdc table doesn't exist");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), 
e.getErrorCode());
+            
assertTrue(e.getMessage().endsWith(SchemaUtil.getUnEscapedFullName(cdcName)));
+        }
     }
 
     @Test
-    public void testCreateCaseSensitiveSchemaAndTable() throws Exception {
+    public void testCreateDropCaseSensitiveSchemaAndTable() throws Exception {
         Connection conn = newConnection();
         String schemaName = "\"" + generateUniqueName().toLowerCase() + "\"";
         String tableName = SchemaUtil.getTableName(schemaName, "\"" + 
generateUniqueName().toLowerCase() + "\"");
@@ -166,6 +177,17 @@ public class CDCDefinitionIT extends CDCBaseIT {
         conn.createStatement().execute(cdc_sql);
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         conn.createStatement().executeQuery("SELECT * FROM " + cdcFullName);
+
+        String drop_sql = forView ? "DROP VIEW " + tableName : "DROP TABLE " + 
tableName;
+        conn.createStatement().execute(drop_sql);
+        String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+        try {
+            conn.createStatement().execute(drop_cdc_sql);
+            fail("Expected to fail as cdc table doesn't exist");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), 
e.getErrorCode());
+            
assertTrue(e.getMessage().endsWith(SchemaUtil.getUnEscapedFullName(cdcName)));
+        }
     }
 
     @Test
@@ -250,6 +272,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
         assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS);
     }
 
+    @Test
     public void testDropCDC () throws SQLException {
         Properties props = new Properties();
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -258,6 +281,8 @@ public class CDCDefinitionIT extends CDCBaseIT {
                 "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " 
v1 INTEGER,"
                         + " v2 DATE)");
         String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(cdc_sql);
 
         String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
         conn.createStatement().execute(drop_cdc_sql);
@@ -303,6 +328,44 @@ public class CDCDefinitionIT extends CDCBaseIT {
         }
     }
 
+    @Test
+    public void testDropTable() throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " 
v1 INTEGER,"
+                        + " v2 DATE)");
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(cdc_sql);
+
+        String drop_table_sql = "DROP TABLE " + tableName;
+        conn.createStatement().execute(drop_table_sql);
+
+        // index should have been dropped
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT 
index_type FROM " +
+                "system.catalog WHERE table_name = '" + 
CDCUtil.getCDCIndexName(cdcName) +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(false, rs.next());
+        }
+        //cdc object should have been dropped
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT 
cdc_include FROM " +
+                "system.catalog WHERE table_name = '" + cdcName +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(false, rs.next());
+        }
+
+        String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+        try {
+            conn.createStatement().execute(drop_cdc_sql);
+            fail("Expected to fail as cdc table doesn't exist");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), 
e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(cdcName));
+        }
+    }
+
     @Test
     public void testSelectCDCBadIncludeSpec() throws Exception {
         Connection conn = newConnection();

Reply via email to