haridsv commented on code in PR #2057: URL: https://github.com/apache/phoenix/pull/2057#discussion_r1926393642
########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java: ########## @@ -95,52 +101,92 @@ public void postCompletedSplitRegionAction(final ObserverContext<MasterCoprocess regionInfoA.getTable()); return; } - // find streamName with ENABLED status String tableName = phoenixTable.getName().getString(); - PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY); - pstmt.setString(1, tableName); - ResultSet rs = pstmt.executeQuery(); - if (rs.next()) { - String streamName = rs.getString(1); - LOGGER.info("Updating partition metadata for table={}, stream={} daughters {} {}", + String streamName = getStreamName(conn, tableName); + if (streamName != null) { + LOGGER.info("Updating split partition metadata for table={}, stream={} daughters {} {}", tableName, streamName, regionInfoA.getEncodedName(), regionInfoB.getEncodedName()); - // ancestorIDs = [parentId, grandparentId1] - List<String> ancestorIDs = getAncestorIds(conn, tableName, streamName, regionInfoA, regionInfoB); - upsertDaughterPartition(conn, tableName, streamName, ancestorIDs.get(0), regionInfoA); - upsertDaughterPartition(conn, tableName, streamName, ancestorIDs.get(0), regionInfoB); - updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, regionInfoA.getRegionId()); + // ancestorIDs = [parentId, grandparentId1, grandparentId2...] + List<String> ancestorIDs + = getAncestorIdsForSplit(conn, tableName, streamName, regionInfoA, regionInfoB); + + upsertDaughterPartitions(conn, tableName, streamName, ancestorIDs.subList(0, 1), + Arrays.asList(regionInfoA, regionInfoB)); + + updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, + regionInfoA.getRegionId()); + } else { + LOGGER.info("{} does not have a stream enabled, skipping partition metadata update.", + regionInfoA.getTable()); } } catch (SQLException e) { LOGGER.error("Unable to update CDC Stream Partition metadata during split with daughter regions: {} {}", regionInfoA.getEncodedName(), regionInfoB.getEncodedName(), e); } } - private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException { - PTable pTable; - try { - pTable = PhoenixRuntime.getTable(conn, tableName.toString()); - } catch (TableNotFoundException e) { - return null; + /** + * Update parent -> daughter relationship for CDC Streams when regions merge. + * - upsert partition metadata for the daughter with each parent + * - update the end time on all the parents' partition metadata + * @param c the environment to interact with the framework and master + * @param regionsToMerge parent regions which merged + * @param mergedRegion daughter region + */ + @Override + public void postCompletedMergeRegionsAction(final ObserverContext<MasterCoprocessorEnvironment> c, + final RegionInfo[] regionsToMerge, + final RegionInfo mergedRegion) { + Configuration conf = c.getEnvironment().getConfiguration(); + try (Connection conn = QueryUtil.getConnectionOnServer(conf)) { + // CDC will be enabled on Phoenix tables only + PTable phoenixTable = getPhoenixTable(conn, mergedRegion.getTable()); + if (phoenixTable == null) { + LOGGER.info("{} is not a Phoenix Table, skipping partition metadata update.", + mergedRegion.getTable()); + return; + } + String tableName = phoenixTable.getName().getString(); + String streamName = getStreamName(conn, tableName); + if (streamName != null) { + LOGGER.info("Updating merged partition metadata for table={}, stream={} daughter {}", + tableName, streamName, mergedRegion.getEncodedName()); + // upsert a row for daughter-parent for each merged region + upsertDaughterPartitions(conn, tableName, streamName, + Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()), + Arrays.asList(mergedRegion)); + + // lookup all ancestors of a merged region and update the endTime + for (RegionInfo ri : regionsToMerge) { + List<String> ancestorIDs = getAncestorIdsForMerge(conn, tableName, streamName, ri); + updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, + mergedRegion.getRegionId()); Review Comment: Nit optimization: In case of merge, the parent will appear multiple times, but needs to be updated only once. ########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java: ########## @@ -173,50 +219,113 @@ private List<String> getAncestorIds(Connection conn, String tableName, String st Bytes.toStringBinary(regionInfoB.getStartKey()), Bytes.toStringBinary(regionInfoB.getEndKey()))); } + // if parent was a result of a merge, there will be multiple grandparents. + while (rs.next()) { + ancestorIDs.add(rs.getString(2)); + } return ancestorIDs; } /** - * Insert partition metadata for a daughter region from the split. + * Lookup the parent of a merged region. + * If the merged region was an output of a merge in the past, it will have multiple parents. */ - private void upsertDaughterPartition(Connection conn, String tableName, - String streamName, String parentPartitionID, - RegionInfo regionInfo) - throws SQLException { - String partitionId = regionInfo.getEncodedName(); - long startTime = regionInfo.getRegionId(); - byte[] startKey = regionInfo.getStartKey(); - byte[] endKey = regionInfo.getEndKey(); - PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL); + private List<String> getAncestorIdsForMerge(Connection conn, String tableName, String streamName, + RegionInfo parent) throws SQLException { + List<String> ancestorIDs = new ArrayList<>(); + ancestorIDs.add(parent.getEncodedName()); + PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE); pstmt.setString(1, tableName); pstmt.setString(2, streamName); - pstmt.setString(3, partitionId); - pstmt.setString(4, parentPartitionID); - pstmt.setLong(5, startTime); - // endTime in not set when inserting a new partition - pstmt.setNull(6, Types.BIGINT); - pstmt.setBytes(7, startKey.length == 0 ? null : startKey); - pstmt.setBytes(8, endKey.length == 0 ? null : endKey); - pstmt.executeUpdate(); + pstmt.setString(3, parent.getEncodedName()); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) { + ancestorIDs.add(rs.getString(1)); + } else { + throw new SQLException(String.format( + "Could not find parent of the provided merged region: {}", parent.getEncodedName())); + } + // if parent was a result of a merge, there will be multiple grandparents. + while (rs.next()) { + ancestorIDs.add(rs.getString(1)); + } + return ancestorIDs; + } + + /** + * Insert partition metadata for a daughter region from a split or a merge. + * split: 2 daughters, 1 parent + * merge: 1 daughter, N parents + */ + private void upsertDaughterPartitions(Connection conn, String tableName, + String streamName, List<String> parentPartitionIDs, + List<RegionInfo> daughters) + throws SQLException { + PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL); + for (RegionInfo daughter : daughters) { + for (String parentPartitionID : parentPartitionIDs) { + String partitionId = daughter.getEncodedName(); + long startTime = daughter.getRegionId(); + byte[] startKey = daughter.getStartKey(); + byte[] endKey = daughter.getEndKey(); + pstmt.setString(1, tableName); + pstmt.setString(2, streamName); + pstmt.setString(3, partitionId); + pstmt.setString(4, parentPartitionID); + pstmt.setLong(5, startTime); + // endTime in not set when inserting a new partition + pstmt.setNull(6, Types.BIGINT); + pstmt.setBytes(7, startKey.length == 0 ? null : startKey); + pstmt.setBytes(8, endKey.length == 0 ? null : endKey); + pstmt.executeUpdate(); + } + } conn.commit(); Review Comment: Isn't the connection by default set to `autocommit: true` per JDBC standard? It is better to have a private util function that wraps `QueryUtil.getConnectionOnServer` and turns off autocommit instead of doing it locally in the 2 methods that do upsert. ########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java: ########## @@ -173,50 +219,113 @@ private List<String> getAncestorIds(Connection conn, String tableName, String st Bytes.toStringBinary(regionInfoB.getStartKey()), Bytes.toStringBinary(regionInfoB.getEndKey()))); } + // if parent was a result of a merge, there will be multiple grandparents. + while (rs.next()) { + ancestorIDs.add(rs.getString(2)); + } Review Comment: What is the situation in which a split will have multiple ancestors? In fact, isn't it an error if we actually find 2 regions with exactly the same start and end keys? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org