This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 86be1e56d5 PHOENIX-7500 : Add PARENT_PARTITION_ID to SYSTEM.CDC_STREAM
table's composite pk (#2052)
86be1e56d5 is described below
commit 86be1e56d52506e282bd41feebfcce99a156ce14
Author: Palash Chauhan <[email protected]>
AuthorDate: Wed Jan 8 08:29:47 2025 -0800
PHOENIX-7500 : Add PARENT_PARTITION_ID to SYSTEM.CDC_STREAM table's
composite pk (#2052)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/query/QueryConstants.java | 4 +-
.../phoenix/coprocessor/PhoenixMasterObserver.java | 45 ++++++++++++++--------
2 files changed, 31 insertions(+), 18 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 62142a454c..14e9b05fff 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -670,14 +670,14 @@ public interface QueryConstants {
TABLE_NAME + " VARCHAR NOT NULL," +
STREAM_NAME + " VARCHAR NOT NULL," +
PARTITION_ID + " VARCHAR NOT NULL," +
- // Non-PK columns
PARENT_PARTITION_ID + " VARCHAR," +
+ // Non-PK columns
PARTITION_START_TIME + " BIGINT," +
PARTITION_END_TIME + " BIGINT," +
PARTITION_START_KEY + " VARBINARY_ENCODED," +
PARTITION_END_KEY + " VARBINARY_ENCODED,\n" +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
- TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "))\n" +
+ TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "," +
PARENT_PARTITION_ID + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
TRANSACTIONAL + "=" + Boolean.FALSE;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index 80cfbc718d..f51cf7c154 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -22,6 +22,8 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@@ -59,12 +61,12 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES
(?,?,?,?,?,?,?,?)";
private static final String PARENT_PARTITION_QUERY
- = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+ = "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " +
SYSTEM_CDC_STREAM_NAME
+ " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME,
STREAM_NAME, PARTITION_ID, "
- + "PARTITION_END_TIME) VALUES (?,?,?,?)";
+ + "PARENT_PARTITION_ID, PARTITION_END_TIME) VALUES (?,?,?,?,?)";
@Override
public Optional<MasterObserver> getMasterObserver() {
@@ -85,8 +87,7 @@ public class PhoenixMasterObserver implements MasterObserver,
MasterCoprocessor
final RegionInfo regionInfoA,
final RegionInfo regionInfoB) {
Configuration conf = c.getEnvironment().getConfiguration();
- try {
- Connection conn = QueryUtil.getConnectionOnServer(conf);
+ try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
// CDC will be enabled on Phoenix tables only
PTable phoenixTable = getPhoenixTable(conn,
regionInfoA.getTable());
if (phoenixTable == null) {
@@ -103,10 +104,11 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
String streamName = rs.getString(1);
LOGGER.info("Updating partition metadata for table={},
stream={} daughters {} {}",
tableName, streamName, regionInfoA.getEncodedName(),
regionInfoB.getEncodedName());
- String parentPartitionID = getParentPartitionId(conn,
tableName, streamName, regionInfoA, regionInfoB);
- upsertDaughterPartition(conn, tableName, streamName,
parentPartitionID, regionInfoA);
- upsertDaughterPartition(conn, tableName, streamName,
parentPartitionID, regionInfoB);
- updateParentPartitionEndTime(conn, tableName, streamName,
parentPartitionID, regionInfoA.getRegionId());
+ // ancestorIDs = [parentId, grandparentId1]
+ List<String> ancestorIDs = getAncestorIds(conn, tableName,
streamName, regionInfoA, regionInfoB);
+ upsertDaughterPartition(conn, tableName, streamName,
ancestorIDs.get(0), regionInfoA);
+ upsertDaughterPartition(conn, tableName, streamName,
ancestorIDs.get(0), regionInfoB);
+ updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs, regionInfoA.getRegionId());
}
} catch (SQLException e) {
LOGGER.error("Unable to update CDC Stream Partition metadata
during split with daughter regions: {} {}",
@@ -127,9 +129,12 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
/**
* Lookup parent's partition id (region's encoded name) in
SYSTEM.CDC_STREAM.
* RegionInfoA is left daughter and RegionInfoB is right daughter so
parent's key range would
- * be [RegionInfoA stratKey, RegionInfoB endKey]
+ * be [RegionInfoA startKey, RegionInfoB endKey]
+ * Return both parent and grandparent partition ids.
+ *
+ * TODO: When we implement merges in this coproc, there could be multiple
grandparents.
*/
- private String getParentPartitionId(Connection conn, String tableName,
String streamName,
+ private List<String> getAncestorIds(Connection conn, String tableName,
String streamName,
RegionInfo regionInfoA, RegionInfo
regionInfoB)
throws SQLException {
byte[] parentStartKey = regionInfoA.getStartKey();
@@ -154,9 +159,12 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey);
if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
LOGGER.info("Query to get parent partition id: " + pstmt);
+
+ List<String> ancestorIDs = new ArrayList<>();
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
- return rs.getString(1);
+ ancestorIDs.add(rs.getString(1));
+ ancestorIDs.add(rs.getString(2));
} else {
throw new SQLException(String.format("Could not find parent of the
provided daughters: "
+ "startKeyA=%s endKeyA=%s startKeyB=%s
endKeyB=%s",
@@ -165,6 +173,7 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
Bytes.toStringBinary(regionInfoB.getStartKey()),
Bytes.toStringBinary(regionInfoB.getEndKey())));
}
+ return ancestorIDs;
}
/**
@@ -193,16 +202,20 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
}
/**
- * Update parent partition's endTime by setting it to daughter's startTime.
+ * Update endTime in all rows of parent partition by setting it to
daughter's startTime.
+ *
+ * TODO: When we implement merges in this coproc, update all rows of the
parent.
*/
private void updateParentPartitionEndTime(Connection conn, String
tableName,
- String streamName, String
parentPartitionID,
- long daughterStartTime) throws
SQLException {
+ String streamName, List<String>
ancestorIDs,
+ long daughterStartTime) throws
SQLException {
+ // ancestorIDs = [parentID, grandparentID]
PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
pstmt.setString(1, tableName);
pstmt.setString(2, streamName);
- pstmt.setString(3, parentPartitionID);
- pstmt.setLong(4, daughterStartTime);
+ pstmt.setString(3, ancestorIDs.get(0));
+ pstmt.setString(4, ancestorIDs.get(1));
+ pstmt.setLong(5, daughterStartTime);
pstmt.executeUpdate();
conn.commit();
}