This is an automated email from the ASF dual-hosted git repository. palashc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 90d3e16c68 PHOENIX-7644 : CDC Stream improvements (#2196) 90d3e16c68 is described below commit 90d3e16c687617b05f0a789afb8fd278ca51416e Author: Palash Chauhan <palashc...@gmail.com> AuthorDate: Fri Jun 20 10:26:24 2025 -0700 PHOENIX-7644 : CDC Stream improvements (#2196) Co-authored-by: Palash Chauhan <p.chau...@pchauha-ltmgv47.internal.salesforce.com> --- .../org/apache/phoenix/query/QueryConstants.java | 6 +- .../main/java/org/apache/phoenix/util/CDCUtil.java | 11 +- .../phoenix/coprocessor/MetaDataEndpointImpl.java | 5 + .../phoenix/coprocessor/PhoenixMasterObserver.java | 155 +++++++++++---------- .../apache/phoenix/end2end/CDCDefinitionIT.java | 67 ++++++++- 5 files changed, 164 insertions(+), 80 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java index 02c27096a0..9911f5f0a3 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -663,7 +663,8 @@ public interface QueryConstants { TABLE_NAME + "," + STREAM_NAME + "))\n" + HConstants.VERSIONS + "=%s,\n" + ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" + - TRANSACTIONAL + "=" + Boolean.FALSE; + TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" + + UPDATE_CACHE_FREQUENCY + "=" + "7200000"; String CREATE_CDC_STREAM_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CDC_STREAM_TABLE + "\"(\n" + @@ -682,5 +683,6 @@ public interface QueryConstants { TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "," + PARENT_PARTITION_ID + "))\n" + HConstants.VERSIONS + "=%s,\n" + ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" + - TRANSACTIONAL + "=" + Boolean.FALSE; + TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" + + UPDATE_CACHE_FREQUENCY + "=" + "7200000"; } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java index 3117c5cebc..b728273cf0 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.sql.Types; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Base64; import java.util.Date; import java.util.HashSet; @@ -39,7 +40,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PVarchar; import org.bson.RawBsonDocument; public class CDCUtil { @@ -99,6 +99,15 @@ public class CDCUtil { return indexName.startsWith(CDC_INDEX_PREFIX); } + public static boolean isCDCIndex(byte[] indexNameBytes) { + String indexName = Bytes.toString(indexNameBytes); + return isCDCIndex(indexName); + } + + public static byte[] getCdcObjectName(byte[] cdcIndexName) { + return Arrays.copyOfRange(cdcIndexName, CDC_INDEX_PREFIX.length(), cdcIndexName.length); + } + public static boolean isCDCIndex(PTable indexTable) { return isCDCIndex(indexTable.getTableName().getString()); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 9165bd278c..6253b66eb1 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -3333,6 +3333,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // Recursively delete indexes for (byte[] indexName : indexNames) { + if (CDCUtil.isCDCIndex(indexName)) { + byte[] cdcKey = SchemaUtil.getTableKey(tenantId, schemaName, CDCUtil.getCdcObjectName(indexName)); + Delete deleteCdc = new Delete(cdcKey, clientTimeStamp); + catalogMutations.add(deleteCdc); + } byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, indexName); // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870). // FIXME: the version of the Delete constructor without the lock args was introduced diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java index ce41e98e85..e2bcf73642 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java @@ -276,33 +276,34 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor qb.append(" AND PARTITION_END_KEY = ? "); } - PreparedStatement pstmt = conn.prepareStatement(qb.toString()); - int index = 1; - pstmt.setString(index++, tableName); - pstmt.setString(index++, streamName); - if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey); - if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey); - LOGGER.info("Query to get parent partition id: " + pstmt); - List<String> ancestorIDs = new ArrayList<>(); - ResultSet rs = pstmt.executeQuery(); List<Long> parentPartitionStartTimes = new ArrayList<>(); - if (rs.next()) { - ancestorIDs.add(rs.getString(1)); - ancestorIDs.add(rs.getString(2)); - parentPartitionStartTimes.add(rs.getLong(3)); - } else { - throw new ParentPartitionNotFound( - String.format("Could not find parent of the provided daughters: " - + "startKeyA=%s endKeyA=%s startKeyB=%s endKeyB=%s", - Bytes.toStringBinary(regionInfoA.getStartKey()), - Bytes.toStringBinary(regionInfoA.getEndKey()), - Bytes.toStringBinary(regionInfoB.getStartKey()), - Bytes.toStringBinary(regionInfoB.getEndKey()))); - } - // if parent was a result of a merge, there will be multiple grandparents. - while (rs.next()) { - ancestorIDs.add(rs.getString(2)); + try (PreparedStatement pstmt = conn.prepareStatement(qb.toString())) { + int index = 1; + pstmt.setString(index++, tableName); + pstmt.setString(index++, streamName); + if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey); + if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey); + LOGGER.info("Query to get parent partition id: " + pstmt); + + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) { + ancestorIDs.add(rs.getString(1)); + ancestorIDs.add(rs.getString(2)); + parentPartitionStartTimes.add(rs.getLong(3)); + } else { + throw new ParentPartitionNotFound( + String.format("Could not find parent of the provided daughters: " + + "startKeyA=%s endKeyA=%s startKeyB=%s endKeyB=%s", + Bytes.toStringBinary(regionInfoA.getStartKey()), + Bytes.toStringBinary(regionInfoA.getEndKey()), + Bytes.toStringBinary(regionInfoB.getStartKey()), + Bytes.toStringBinary(regionInfoB.getEndKey()))); + } + // if parent was a result of a merge, there will be multiple grandparents. + while (rs.next()) { + ancestorIDs.add(rs.getString(2)); + } } return new Pair<>(ancestorIDs, parentPartitionStartTimes); } @@ -316,21 +317,22 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor RegionInfo parent) throws SQLException { List<String> ancestorIDs = new ArrayList<>(); ancestorIDs.add(parent.getEncodedName()); - PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE); - pstmt.setString(1, tableName); - pstmt.setString(2, streamName); - pstmt.setString(3, parent.getEncodedName()); - ResultSet rs = pstmt.executeQuery(); - if (rs.next()) { - ancestorIDs.add(rs.getString(1)); - } else { - throw new ParentPartitionNotFound(String.format( - "Could not find parent of the provided merged region: %s", - parent.getEncodedName())); - } - // if parent was a result of a merge, there will be multiple grandparents. - while (rs.next()) { - ancestorIDs.add(rs.getString(1)); + try (PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE)) { + pstmt.setString(1, tableName); + pstmt.setString(2, streamName); + pstmt.setString(3, parent.getEncodedName()); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) { + ancestorIDs.add(rs.getString(1)); + } else { + throw new ParentPartitionNotFound(String.format( + "Could not find parent of the provided merged region: %s", + parent.getEncodedName())); + } + // if parent was a result of a merge, there will be multiple grandparents. + while (rs.next()) { + ancestorIDs.add(rs.getString(1)); + } } return ancestorIDs; } @@ -346,27 +348,28 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor List<Long> parentPartitionStartTimes) throws SQLException { conn.setAutoCommit(false); - PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL); - for (RegionInfo daughter : daughters) { - for (int i=0; i<parentPartitionIDs.size(); i++) { - String partitionId = daughter.getEncodedName(); - long startTime = daughter.getRegionId(); - byte[] startKey = daughter.getStartKey(); - byte[] endKey = daughter.getEndKey(); - pstmt.setString(1, tableName); - pstmt.setString(2, streamName); - pstmt.setString(3, partitionId); - pstmt.setString(4, parentPartitionIDs.get(i)); - pstmt.setLong(5, startTime); - // endTime in not set when inserting a new partition - pstmt.setNull(6, Types.BIGINT); - pstmt.setBytes(7, startKey.length == 0 ? null : startKey); - pstmt.setBytes(8, endKey.length == 0 ? null : endKey); - pstmt.setLong(9, parentPartitionStartTimes.get(i)); - pstmt.executeUpdate(); + try (PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL)) { + for (RegionInfo daughter : daughters) { + for (int i=0; i<parentPartitionIDs.size(); i++) { + String partitionId = daughter.getEncodedName(); + long startTime = daughter.getRegionId(); + byte[] startKey = daughter.getStartKey(); + byte[] endKey = daughter.getEndKey(); + pstmt.setString(1, tableName); + pstmt.setString(2, streamName); + pstmt.setString(3, partitionId); + pstmt.setString(4, parentPartitionIDs.get(i)); + pstmt.setLong(5, startTime); + // endTime in not set when inserting a new partition + pstmt.setNull(6, Types.BIGINT); + pstmt.setBytes(7, startKey.length == 0 ? null : startKey); + pstmt.setBytes(8, endKey.length == 0 ? null : endKey); + pstmt.setLong(9, parentPartitionStartTimes.get(i)); + pstmt.executeUpdate(); + } } + conn.commit(); } - conn.commit(); } /** @@ -380,29 +383,31 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor long daughterStartTime) throws SQLException { conn.setAutoCommit(false); // ancestorIDs = [parentID, grandparentID1, grandparentID2...] - PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL); - for (int i=1; i<ancestorIDs.size(); i++) { - pstmt.setString(1, tableName); - pstmt.setString(2, streamName); - pstmt.setString(3, ancestorIDs.get(0)); - pstmt.setString(4, ancestorIDs.get(i)); - pstmt.setLong(5, daughterStartTime); - pstmt.executeUpdate(); + try (PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL)) { + for (int i=1; i<ancestorIDs.size(); i++) { + pstmt.setString(1, tableName); + pstmt.setString(2, streamName); + pstmt.setString(3, ancestorIDs.get(0)); + pstmt.setString(4, ancestorIDs.get(i)); + pstmt.setLong(5, daughterStartTime); + pstmt.executeUpdate(); + } + conn.commit(); } - conn.commit(); } /** * Get the stream name on the given table if one exists in ENABLED state. */ private String getStreamName(Connection conn, String tableName) throws SQLException { - PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY); - pstmt.setString(1, tableName); - ResultSet rs = pstmt.executeQuery(); - if (rs.next()) { - return rs.getString(1); - } else { - return null; + try (PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY)) { + pstmt.setString(1, tableName); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) { + return rs.getString(1); + } else { + return null; + } } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java index bb3263a93f..42fb437656 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java @@ -129,7 +129,7 @@ public class CDCDefinitionIT extends CDCBaseIT { } @Test - public void testCreateCaseSensitiveTable() throws Exception { + public void testCreateDropCaseSensitiveTable() throws Exception { Connection conn = newConnection(); String tableName = "\"" + generateUniqueName().toLowerCase() + "\""; conn.createStatement().execute( @@ -145,10 +145,21 @@ public class CDCDefinitionIT extends CDCBaseIT { String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(cdc_sql); conn.createStatement().executeQuery("SELECT * FROM " + cdcName); + + String drop_sql = forView ? "DROP VIEW " + tableName : "DROP TABLE " + tableName; + conn.createStatement().execute(drop_sql); + String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName; + try { + conn.createStatement().execute(drop_cdc_sql); + fail("Expected to fail as cdc table doesn't exist"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode()); + assertTrue(e.getMessage().endsWith(SchemaUtil.getUnEscapedFullName(cdcName))); + } } @Test - public void testCreateCaseSensitiveSchemaAndTable() throws Exception { + public void testCreateDropCaseSensitiveSchemaAndTable() throws Exception { Connection conn = newConnection(); String schemaName = "\"" + generateUniqueName().toLowerCase() + "\""; String tableName = SchemaUtil.getTableName(schemaName, "\"" + generateUniqueName().toLowerCase() + "\""); @@ -166,6 +177,17 @@ public class CDCDefinitionIT extends CDCBaseIT { conn.createStatement().execute(cdc_sql); String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); conn.createStatement().executeQuery("SELECT * FROM " + cdcFullName); + + String drop_sql = forView ? "DROP VIEW " + tableName : "DROP TABLE " + tableName; + conn.createStatement().execute(drop_sql); + String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName; + try { + conn.createStatement().execute(drop_cdc_sql); + fail("Expected to fail as cdc table doesn't exist"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode()); + assertTrue(e.getMessage().endsWith(SchemaUtil.getUnEscapedFullName(cdcName))); + } } @Test @@ -250,6 +272,7 @@ public class CDCDefinitionIT extends CDCBaseIT { assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS); } + @Test public void testDropCDC () throws SQLException { Properties props = new Properties(); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -258,6 +281,8 @@ public class CDCDefinitionIT extends CDCBaseIT { "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER," + " v2 DATE)"); String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; + conn.createStatement().execute(cdc_sql); String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(drop_cdc_sql); @@ -303,6 +328,44 @@ public class CDCDefinitionIT extends CDCBaseIT { } } + @Test + public void testDropTable() throws SQLException { + Properties props = new Properties(); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER," + + " v2 DATE)"); + String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; + conn.createStatement().execute(cdc_sql); + + String drop_table_sql = "DROP TABLE " + tableName; + conn.createStatement().execute(drop_table_sql); + + // index should have been dropped + try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " + + "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) + + "' AND column_name IS NULL and column_family IS NULL")) { + assertEquals(false, rs.next()); + } + //cdc object should have been dropped + try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " + + "system.catalog WHERE table_name = '" + cdcName + + "' AND column_name IS NULL and column_family IS NULL")) { + assertEquals(false, rs.next()); + } + + String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName; + try { + conn.createStatement().execute(drop_cdc_sql); + fail("Expected to fail as cdc table doesn't exist"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode()); + assertTrue(e.getMessage().endsWith(cdcName)); + } + } + @Test public void testSelectCDCBadIncludeSpec() throws Exception { Connection conn = newConnection();