This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 86be1e56d5 PHOENIX-7500 : Add PARENT_PARTITION_ID to SYSTEM.CDC_STREAM 
table's composite pk (#2052)
86be1e56d5 is described below

commit 86be1e56d52506e282bd41feebfcce99a156ce14
Author: Palash Chauhan <[email protected]>
AuthorDate: Wed Jan 8 08:29:47 2025 -0800

    PHOENIX-7500 : Add PARENT_PARTITION_ID to SYSTEM.CDC_STREAM table's 
composite pk (#2052)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../org/apache/phoenix/query/QueryConstants.java   |  4 +-
 .../phoenix/coprocessor/PhoenixMasterObserver.java | 45 ++++++++++++++--------
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 62142a454c..14e9b05fff 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -670,14 +670,14 @@ public interface QueryConstants {
             TABLE_NAME + " VARCHAR NOT NULL," +
             STREAM_NAME + " VARCHAR NOT NULL," +
             PARTITION_ID + " VARCHAR NOT NULL," +
-            // Non-PK columns
             PARENT_PARTITION_ID + " VARCHAR," +
+            // Non-PK columns
             PARTITION_START_TIME + " BIGINT," +
             PARTITION_END_TIME + " BIGINT," +
             PARTITION_START_KEY + " VARBINARY_ENCODED," +
             PARTITION_END_KEY + " VARBINARY_ENCODED,\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
-            TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "))\n" +
+            TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "," + 
PARENT_PARTITION_ID + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
             TRANSACTIONAL + "=" + Boolean.FALSE;
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index 80cfbc718d..f51cf7c154 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -22,6 +22,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 
 import org.apache.hadoop.conf.Configuration;
@@ -59,12 +61,12 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,?,?,?,?,?)";
 
     private static final String PARENT_PARTITION_QUERY
-            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+            = "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " + 
SYSTEM_CDC_STREAM_NAME
             + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
 
     private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, "
-            + "PARTITION_END_TIME) VALUES (?,?,?,?)";
+            + "PARENT_PARTITION_ID, PARTITION_END_TIME) VALUES (?,?,?,?,?)";
 
     @Override
     public Optional<MasterObserver> getMasterObserver() {
@@ -85,8 +87,7 @@ public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor
                                                final RegionInfo regionInfoA,
                                                final RegionInfo regionInfoB) {
         Configuration conf = c.getEnvironment().getConfiguration();
-        try {
-            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+        try (Connection conn  = QueryUtil.getConnectionOnServer(conf)) {
             // CDC will be enabled on Phoenix tables only
             PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
             if (phoenixTable == null) {
@@ -103,10 +104,11 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                 String streamName = rs.getString(1);
                 LOGGER.info("Updating partition metadata for table={}, 
stream={} daughters {} {}",
                         tableName, streamName, regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName());
-                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
-                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
-                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
-                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+                // ancestorIDs = [parentId, grandparentId1]
+                List<String> ancestorIDs = getAncestorIds(conn, tableName, 
streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
ancestorIDs.get(0), regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
ancestorIDs.get(0), regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs, regionInfoA.getRegionId());
             }
         } catch (SQLException e) {
             LOGGER.error("Unable to update CDC Stream Partition metadata 
during split with daughter regions: {} {}",
@@ -127,9 +129,12 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
     /**
      * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
      * RegionInfoA is left daughter and RegionInfoB is right daughter so 
parent's key range would
-     * be [RegionInfoA stratKey, RegionInfoB endKey]
+     * be [RegionInfoA startKey, RegionInfoB endKey]
+     * Return both parent and grandparent partition ids.
+     *
+     * TODO: When we implement merges in this coproc, there could be multiple 
grandparents.
      */
-    private String getParentPartitionId(Connection conn, String tableName, 
String streamName,
+    private List<String> getAncestorIds(Connection conn, String tableName, 
String streamName,
                                         RegionInfo regionInfoA, RegionInfo 
regionInfoB)
             throws SQLException {
         byte[] parentStartKey = regionInfoA.getStartKey();
@@ -154,9 +159,12 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
         if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey);
         if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
         LOGGER.info("Query to get parent partition id: " + pstmt);
+
+        List<String> ancestorIDs = new ArrayList<>();
         ResultSet rs = pstmt.executeQuery();
         if (rs.next()) {
-            return rs.getString(1);
+            ancestorIDs.add(rs.getString(1));
+            ancestorIDs.add(rs.getString(2));
         } else {
             throw new SQLException(String.format("Could not find parent of the 
provided daughters: "
                             + "startKeyA=%s endKeyA=%s startKeyB=%s 
endKeyB=%s",
@@ -165,6 +173,7 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                     Bytes.toStringBinary(regionInfoB.getStartKey()),
                     Bytes.toStringBinary(regionInfoB.getEndKey())));
         }
+        return ancestorIDs;
     }
 
     /**
@@ -193,16 +202,20 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
     }
 
     /**
-     * Update parent partition's endTime by setting it to daughter's startTime.
+     * Update endTime in all rows of parent partition by setting it to 
daughter's startTime.
+     *
+     * TODO: When we implement merges in this coproc, update all rows of the 
parent.
      */
     private void updateParentPartitionEndTime(Connection conn, String 
tableName,
-                                              String streamName, String 
parentPartitionID,
-                                              long daughterStartTime) throws 
SQLException {
+                                              String streamName, List<String> 
ancestorIDs,
+                                          long daughterStartTime) throws 
SQLException {
+        // ancestorIDs = [parentID, grandparentID]
         PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
         pstmt.setString(1, tableName);
         pstmt.setString(2, streamName);
-        pstmt.setString(3, parentPartitionID);
-        pstmt.setLong(4, daughterStartTime);
+        pstmt.setString(3, ancestorIDs.get(0));
+        pstmt.setString(4, ancestorIDs.get(1));
+        pstmt.setLong(5, daughterStartTime);
         pstmt.executeUpdate();
         conn.commit();
     }

Reply via email to