This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 30801d9a65 PHOENIX-7499 : Update stream metadata when data table 
regions merge (#2057)
30801d9a65 is described below

commit 30801d9a652fc44a1444af1b54c5ed11d0153fc6
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Feb 6 14:20:04 2025 -0800

    PHOENIX-7499 : Update stream metadata when data table regions merge (#2057)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   1 +
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../phoenix/coprocessor/PhoenixMasterObserver.java | 219 ++++++++++++++++-----
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 206 +++++++++++++++----
 .../java/org/apache/phoenix/util/TestUtil.java     |  83 +++++++-
 5 files changed, 412 insertions(+), 99 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index ff250f40ba..1a4b2996b7 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -461,6 +461,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
             SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, 
SYSTEM_CDC_STREAM_TABLE);
     public static final String STREAM_NAME = "STREAM_NAME";
     public static final String STREAM_STATUS = "STREAM_STATUS";
+    public static final String STREAM_TYPE = "STREAM_TYPE";
     public static final String PARTITION_ID = "PARTITION_ID";
     public static final String PARENT_PARTITION_ID = "PARENT_PARTITION_ID";
     public static final String PARTITION_START_TIME = "PARTITION_START_TIME";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index d59249fb58..02589a9265 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -142,6 +142,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_TYPE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
@@ -658,6 +659,7 @@ public interface QueryConstants {
             STREAM_NAME + " VARCHAR NOT NULL," +
             // Non-PK columns
             STREAM_STATUS + " VARCHAR,\n" +
+            STREAM_TYPE + " VARCHAR,\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
             TABLE_NAME + "," + STREAM_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index f51cf7c154..e335d7604a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -23,8 +23,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -60,9 +62,13 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
     private static final String PARTITION_UPSERT_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,?,?,?,?,?)";
 
-    private static final String PARENT_PARTITION_QUERY
+    private static final String PARENT_PARTITION_QUERY_FOR_SPLIT
             = "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " + 
SYSTEM_CDC_STREAM_NAME
-            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND 
PARTITION_END_TIME IS NULL ";
+
+    private static final String PARENT_PARTITION_QUERY_FOR_MERGE
+            = "SELECT PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID = ?";
 
     private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, "
@@ -74,7 +80,7 @@ public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor
     }
 
     /**
-     * Update parent -> daughter relationship for CDC Streams.
+     * Update parent -> daughter relationship for CDC Streams when a region 
splits.
      * - find parent partition id using start/end keys of daughters
      * - upsert partition metadata for the 2 daughters
      * - update the end time on the parent's partition metadata
@@ -95,20 +101,23 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                         regionInfoA.getTable());
                 return;
             }
-            // find streamName with ENABLED status
             String tableName = phoenixTable.getName().getString();
-            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
-            pstmt.setString(1, tableName);
-            ResultSet rs = pstmt.executeQuery();
-            if (rs.next()) {
-                String streamName = rs.getString(1);
-                LOGGER.info("Updating partition metadata for table={}, 
stream={} daughters {} {}",
+            String streamName = getStreamName(conn, tableName);
+            if (streamName != null) {
+                LOGGER.info("Updating split partition metadata for table={}, 
stream={} daughters {} {}",
                         tableName, streamName, regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName());
-                // ancestorIDs = [parentId, grandparentId1]
-                List<String> ancestorIDs = getAncestorIds(conn, tableName, 
streamName, regionInfoA, regionInfoB);
-                upsertDaughterPartition(conn, tableName, streamName, 
ancestorIDs.get(0), regionInfoA);
-                upsertDaughterPartition(conn, tableName, streamName, 
ancestorIDs.get(0), regionInfoB);
-                updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs, regionInfoA.getRegionId());
+                // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
+                List<String> ancestorIDs
+                        = getAncestorIdsForSplit(conn, tableName, streamName, 
regionInfoA, regionInfoB);
+
+                upsertDaughterPartitions(conn, tableName, streamName, 
ancestorIDs.subList(0, 1),
+                        Arrays.asList(regionInfoA, regionInfoB));
+
+                updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
+                        regionInfoA.getRegionId());
+            } else {
+                LOGGER.info("{} does not have a stream enabled, skipping 
partition metadata update.",
+                        regionInfoA.getTable());
             }
         } catch (SQLException e) {
             LOGGER.error("Unable to update CDC Stream Partition metadata 
during split with daughter regions: {} {}",
@@ -116,31 +125,68 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
         }
     }
 
-    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
-        PTable pTable;
-        try {
-            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
-        } catch (TableNotFoundException e) {
-            return null;
+    /**
+     * Update parent -> daughter relationship for CDC Streams when regions 
merge.
+     * - upsert partition metadata for the daughter with each parent
+     * - update the end time on all the parents' partition metadata
+     * @param c the environment to interact with the framework and master
+     * @param regionsToMerge parent regions which merged
+     * @param mergedRegion daughter region
+     */
+    @Override
+    public void postCompletedMergeRegionsAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                                final RegionInfo[] 
regionsToMerge,
+                                                final RegionInfo mergedRegion) 
{
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try (Connection conn  = QueryUtil.getConnectionOnServer(conf)) {
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
mergedRegion.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("{} is not a Phoenix Table, skipping partition 
metadata update.",
+                        mergedRegion.getTable());
+                return;
+            }
+            String tableName = phoenixTable.getName().getString();
+            String streamName = getStreamName(conn, tableName);
+            if (streamName != null) {
+                LOGGER.info("Updating merged partition metadata for table={}, 
stream={} daughter {}",
+                        tableName, streamName, mergedRegion.getEncodedName());
+                // upsert a row for daughter-parent for each merged region
+                upsertDaughterPartitions(conn, tableName, streamName,
+                        
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
+                        Arrays.asList(mergedRegion));
+
+                // lookup all ancestors of a merged region and update the 
endTime
+                for (RegionInfo ri : regionsToMerge) {
+                    List<String> ancestorIDs = getAncestorIdsForMerge(conn, 
tableName, streamName, ri);
+                    updateParentPartitionEndTime(conn, tableName, streamName, 
ancestorIDs,
+                            mergedRegion.getRegionId());
+                }
+            } else {
+                LOGGER.info("{} does not have a stream enabled, skipping 
partition metadata update.",
+                        mergedRegion.getTable());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata 
during merge with " +
+                            "parent regions: {} and daughter region {}",
+                    regionsToMerge, mergedRegion.getEncodedName(), e);
         }
-        return pTable;
     }
 
     /**
-     * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
+     * Lookup a split parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
      * RegionInfoA is left daughter and RegionInfoB is right daughter so 
parent's key range would
      * be [RegionInfoA startKey, RegionInfoB endKey]
-     * Return both parent and grandparent partition ids.
+     * Return parent and all grandparent partition ids.
      *
-     * TODO: When we implement merges in this coproc, there could be multiple 
grandparents.
      */
-    private List<String> getAncestorIds(Connection conn, String tableName, 
String streamName,
+    private List<String> getAncestorIdsForSplit(Connection conn, String 
tableName, String streamName,
                                         RegionInfo regionInfoA, RegionInfo 
regionInfoB)
             throws SQLException {
         byte[] parentStartKey = regionInfoA.getStartKey();
         byte[] parentEndKey = regionInfoB.getEndKey();
 
-        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY_FOR_SPLIT);
         if (parentStartKey.length == 0) {
             qb.append(" AND PARTITION_START_KEY IS NULL ");
         } else {
@@ -173,50 +219,115 @@ public class PhoenixMasterObserver implements 
MasterObserver, MasterCoprocessor
                     Bytes.toStringBinary(regionInfoB.getStartKey()),
                     Bytes.toStringBinary(regionInfoB.getEndKey())));
         }
+        // if parent was a result of a merge, there will be multiple 
grandparents.
+        while (rs.next()) {
+            ancestorIDs.add(rs.getString(2));
+        }
         return ancestorIDs;
     }
 
     /**
-     * Insert partition metadata for a daughter region from the split.
+     * Lookup the parent of a merged region.
+     * If the merged region was an output of a merge in the past, it will have 
multiple parents.
      */
-    private void upsertDaughterPartition(Connection conn, String tableName,
-                                         String streamName, String 
parentPartitionID,
-                                         RegionInfo regionInfo)
-            throws SQLException {
-        String partitionId = regionInfo.getEncodedName();
-        long startTime = regionInfo.getRegionId();
-        byte[] startKey = regionInfo.getStartKey();
-        byte[] endKey = regionInfo.getEndKey();
-        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+    private List<String> getAncestorIdsForMerge(Connection conn, String 
tableName, String streamName,
+                                                RegionInfo parent) throws 
SQLException {
+        List<String> ancestorIDs = new ArrayList<>();
+        ancestorIDs.add(parent.getEncodedName());
+        PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
         pstmt.setString(1, tableName);
         pstmt.setString(2, streamName);
-        pstmt.setString(3, partitionId);
-        pstmt.setString(4, parentPartitionID);
-        pstmt.setLong(5, startTime);
-        // endTime in not set when inserting a new partition
-        pstmt.setNull(6, Types.BIGINT);
-        pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
-        pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
-        pstmt.executeUpdate();
+        pstmt.setString(3, parent.getEncodedName());
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            ancestorIDs.add(rs.getString(1));
+        } else {
+            throw new SQLException(String.format(
+                    "Could not find parent of the provided merged region: {}", 
parent.getEncodedName()));
+        }
+        // if parent was a result of a merge, there will be multiple 
grandparents.
+        while (rs.next()) {
+            ancestorIDs.add(rs.getString(1));
+        }
+        return ancestorIDs;
+    }
+
+    /**
+     * Insert partition metadata for a daughter region from a split or a merge.
+     * split: 2 daughters, 1 parent
+     * merge: 1 daughter, N parents
+     */
+    private void upsertDaughterPartitions(Connection conn, String tableName,
+                                         String streamName, List<String> 
parentPartitionIDs,
+                                         List<RegionInfo> daughters)
+            throws SQLException {
+        conn.setAutoCommit(false);
+        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+        for (RegionInfo daughter : daughters) {
+            for (String parentPartitionID : parentPartitionIDs) {
+                String partitionId = daughter.getEncodedName();
+                long startTime = daughter.getRegionId();
+                byte[] startKey = daughter.getStartKey();
+                byte[] endKey = daughter.getEndKey();
+                pstmt.setString(1, tableName);
+                pstmt.setString(2, streamName);
+                pstmt.setString(3, partitionId);
+                pstmt.setString(4, parentPartitionID);
+                pstmt.setLong(5, startTime);
+                // endTime in not set when inserting a new partition
+                pstmt.setNull(6, Types.BIGINT);
+                pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+                pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+                pstmt.executeUpdate();
+            }
+        }
         conn.commit();
     }
 
     /**
      * Update endTime in all rows of parent partition by setting it to 
daughter's startTime.
+     * parent came from a split : there will only be one record
+     * parent came from a merge : there will be multiple rows, one per 
grandparent
      *
-     * TODO: When we implement merges in this coproc, update all rows of the 
parent.
      */
     private void updateParentPartitionEndTime(Connection conn, String 
tableName,
                                               String streamName, List<String> 
ancestorIDs,
-                                          long daughterStartTime) throws 
SQLException {
-        // ancestorIDs = [parentID, grandparentID]
+                                              long daughterStartTime) throws 
SQLException {
+        conn.setAutoCommit(false);
+        // ancestorIDs = [parentID, grandparentID1, grandparentID2...]
         PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
-        pstmt.setString(1, tableName);
-        pstmt.setString(2, streamName);
-        pstmt.setString(3, ancestorIDs.get(0));
-        pstmt.setString(4, ancestorIDs.get(1));
-        pstmt.setLong(5, daughterStartTime);
-        pstmt.executeUpdate();
+        for (int i=1; i<ancestorIDs.size(); i++) {
+            pstmt.setString(1, tableName);
+            pstmt.setString(2, streamName);
+            pstmt.setString(3, ancestorIDs.get(0));
+            pstmt.setString(4, ancestorIDs.get(i));
+            pstmt.setLong(5, daughterStartTime);
+            pstmt.executeUpdate();
+        }
         conn.commit();
     }
+
+    /**
+     * Get the stream name on the given table if one exists in ENABLED state.
+     */
+    private String getStreamName(Connection conn, String tableName) throws 
SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);
+        pstmt.setString(1, tableName);
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            return rs.getString(1);
+        } else {
+            return null;
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index d70821bb94..98dd22f535 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,13 +18,8 @@
 
 package org.apache.phoenix.end2end;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
@@ -33,12 +28,12 @@ import 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,8 +44,12 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
@@ -172,7 +171,7 @@ public class CDCStreamIT extends CDCBaseIT {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region somewhere in the middle
-        splitTable(conn, tableName, Bytes.toBytes("m"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
 
         //check partition metadata - daughter regions are inserted and 
parent's end time is updated.
         ResultSet rs = conn.createStatement().executeQuery(
@@ -209,9 +208,9 @@ public class CDCStreamIT extends CDCBaseIT {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region [null, null]
-        splitTable(conn, tableName, Bytes.toBytes("l"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
         // we have 2 regions - [null, l], [l, null], split the first region
-        splitTable(conn, tableName, Bytes.toBytes("d"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
         ResultSet rs = conn.createStatement().executeQuery(
                 "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
         PartitionMetadata grandparent = null, splitParent = null, 
unSplitParent = null;
@@ -253,9 +252,9 @@ public class CDCStreamIT extends CDCBaseIT {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region [null, null]
-        splitTable(conn, tableName, Bytes.toBytes("l"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
         // we have 2 regions - [null, l], [l, null], split the second region
-        splitTable(conn, tableName, Bytes.toBytes("q"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
         ResultSet rs = conn.createStatement().executeQuery(
                 "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
         PartitionMetadata grandparent = null, splitParent = null, 
unSplitParent = null;
@@ -297,11 +296,11 @@ public class CDCStreamIT extends CDCBaseIT {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region [null, null]
-        splitTable(conn, tableName, Bytes.toBytes("d"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
         // we have 2 regions - [null, d], [d, null], split the second region
-        splitTable(conn, tableName, Bytes.toBytes("q"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
         // we have 3 regions - [null, d], [d, q], [q, null], split the second 
region
-        splitTable(conn, tableName, Bytes.toBytes("j"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("j"));
         // [null, d], [d, j], [j, q], [q, null]
         ResultSet rs = conn.createStatement().executeQuery(
                 "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
@@ -324,6 +323,160 @@ public class CDCStreamIT extends CDCBaseIT {
         assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'j' && 
d.endKey[0] == 'q'));
     }
 
+    /**
+     * Test split of a region which came from a merge.
+     */
+    @Test
+    public void testPartitionMetadataMergedRegionSplits() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
+
+        //merge the 2 regions
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, 
tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+
+        //split again
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+
+        //verify partition metadata
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+        List<PartitionMetadata> mergedParent = new ArrayList<>();
+        List<PartitionMetadata> splitDaughters = new ArrayList<>();
+
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.startKey == null && pm.endKey == null && 
pm.parentPartitionId != null) {
+                mergedParent.add(pm);
+            }
+            if (pm.endTime == 0) {
+                splitDaughters.add(pm);
+            }
+        }
+        assertEquals(2, mergedParent.size());
+        assertEquals(2, splitDaughters.size());
+        assertEquals(mergedParent.get(0).partitionId, 
mergedParent.get(1).partitionId);
+        assertEquals(mergedParent.get(0).partitionId, 
splitDaughters.get(0).parentPartitionId);
+        assertEquals(mergedParent.get(0).partitionId, 
splitDaughters.get(1).parentPartitionId);
+        assertEquals(splitDaughters.get(0).startTime, 
splitDaughters.get(1).startTime);
+        assertEquals(splitDaughters.get(0).startTime, 
mergedParent.get(0).endTime);
+        assertEquals(splitDaughters.get(0).startTime, 
mergedParent.get(1).endTime);
+    }
+
+
+
+    /**
+     * Test merge of 2 regions which came from a split.
+     */
+    @Test
+    public void testPartitionMetadataSplitRegionsMerge() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+
+        //merge the 2 regions
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, 
tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                                                    
.map(HRegionLocation::getRegion)
+                                                    
.map(RegionInfo::getEncodedName)
+                                                    
.collect(Collectors.toList()));
+
+        //verify partition metadata
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+
+        List<PartitionMetadata> splitParents = new ArrayList<>();
+        List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.startKey == null && pm.endKey == null && pm.endTime == 0) {
+                mergedDaughter.add(pm);
+            }
+            if (pm.startKey != null || pm.endKey != null) {
+                splitParents.add(pm);
+            }
+        }
+        assertEquals(2, mergedDaughter.size());
+        assertEquals(2, splitParents.size());
+        assertEquals(mergedDaughter.get(0).startTime, 
mergedDaughter.get(1).startTime);
+        assertEquals(mergedDaughter.get(0).endTime, 
mergedDaughter.get(1).endTime);
+        assertEquals(mergedDaughter.get(0).partitionId, 
mergedDaughter.get(1).partitionId);
+        assertTrue(mergedDaughter.stream().anyMatch(d -> 
Objects.equals(d.parentPartitionId, splitParents.get(0).partitionId)));
+        assertTrue(mergedDaughter.stream().anyMatch(d -> 
Objects.equals(d.parentPartitionId, splitParents.get(1).partitionId)));
+        for (PartitionMetadata splitDaughter : splitParents) {
+            Assert.assertEquals(mergedDaughter.get(0).startTime, 
splitDaughter.endTime);
+        }
+    }
+
+    /**
+     * Test merge of 2 regions which came from different merges.
+     */
+    @Test
+    public void testPartitionMetadataMergedRegionsMerge() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        // split the only region
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+        // split both regions
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
+        // merge first two and last two regions
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, 
tableName);
+        TestUtil.mergeTableRegions(conn, tableName, 
regions.subList(0,2).stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+        TestUtil.mergeTableRegions(conn, tableName, 
regions.subList(2,4).stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+        // merge the two regions
+        regions = TestUtil.getAllTableRegions(conn, tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+
+        //verify partition metadata
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+
+        List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+        List<PartitionMetadata> mergedParents = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.endTime == 0) {
+                mergedDaughter.add(pm);
+            }
+            // this will add extra rows, we will prune later
+            else if (pm.startKey == null || pm.endKey == null) {
+                mergedParents.add(pm);
+            }
+        }
+        assertEquals(2, mergedDaughter.size());
+        assertEquals(9, mergedParents.size());
+        assertEquals(mergedDaughter.get(0).startTime, 
mergedDaughter.get(1).startTime);
+        Collections.sort(mergedParents, Comparator.comparing(o -> o.endTime));
+        for (PartitionMetadata mergedParent : 
mergedParents.subList(mergedParents.size()-4, mergedParents.size())) {
+            assertEquals(mergedDaughter.get(0).startTime, 
mergedParent.endTime);
+        }
+    }
+
     private String getStreamName(Connection conn, String tableName, String 
cdcName) throws SQLException {
         return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, 
CDCUtil.getCDCCreationTimestamp(
                 
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
@@ -381,31 +534,6 @@ public class CDCStreamIT extends CDCBaseIT {
         conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('z', 8, 'cat')");
     }
 
-    /**
-     * Split the table at the provided split point.
-     */
-    private void splitTable(Connection conn, String tableName, byte[] 
splitPoint) throws Exception {
-        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-        Admin admin = services.getAdmin();
-        Configuration configuration =
-                
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
-        org.apache.hadoop.hbase.client.Connection hbaseConn =
-                ConnectionFactory.createConnection(configuration);
-        RegionLocator regionLocator = 
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
-        int nRegions = regionLocator.getAllRegionLocations().size();
-        try {
-            admin.split(TableName.valueOf(tableName), splitPoint);
-            int retryCount = 0;
-            do {
-                Thread.sleep(2000);
-                retryCount++;
-            } while (retryCount < 10 && 
regionLocator.getAllRegionLocations().size() == nRegions);
-            
Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), nRegions);
-        } finally {
-            admin.close();
-        }
-    }
-
     /**
      * Inner class to represent partition metadata for a region i.e. single 
row from SYSTEM.CDC_STREAM
      */
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 6ea2a2eb65..969dc803cf 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -59,22 +59,26 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.CoprocessorDescriptor;
 import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -220,7 +224,7 @@ public class TestUtil {
     public final static String ROW7 = "00B723122312312";
     public final static String ROW8 = "00B823122312312";
     public final static String ROW9 = "00C923122312312";
-    
+
     public final static String PARENTID1 = "0500x0000000001";
     public final static String PARENTID2 = "0500x0000000002";
     public final static String PARENTID3 = "0500x0000000003";
@@ -230,9 +234,9 @@ public class TestUtil {
     public final static String PARENTID7 = "0500x0000000007";
     public final static String PARENTID8 = "0500x0000000008";
     public final static String PARENTID9 = "0500x0000000009";
-    
+
     public final static List<String> PARENTIDS = Lists.newArrayList(PARENTID1, 
PARENTID2, PARENTID3, PARENTID4, PARENTID5, PARENTID6, PARENTID7, PARENTID8, 
PARENTID9);
-    
+
     public final static String ENTITYHISTID1 = "017x00000000001";
     public final static String ENTITYHISTID2 = "017x00000000002";
     public final static String ENTITYHISTID3 = "017x00000000003";
@@ -244,7 +248,7 @@ public class TestUtil {
     public final static String ENTITYHISTID9 = "017x00000000009";
 
     public final static List<String> ENTITYHISTIDS = 
Lists.newArrayList(ENTITYHISTID1, ENTITYHISTID2, ENTITYHISTID3, ENTITYHISTID4, 
ENTITYHISTID5, ENTITYHISTID6, ENTITYHISTID7, ENTITYHISTID8, ENTITYHISTID9);
-    
+
     public static final String LOCALHOST = "localhost";
     public static final String PHOENIX_JDBC_URL = JDBC_PROTOCOL + 
JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_TERMINATOR + 
PHOENIX_TEST_DRIVER_URL_PARAM;
     public static final String PHOENIX_CONNECTIONLESS_JDBC_URL = JDBC_PROTOCOL 
+ JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + 
PHOENIX_TEST_DRIVER_URL_PARAM;
@@ -897,7 +901,7 @@ public class TestUtil {
             if (table.isTransactional()) {
                 mutationState.commit();
             }
-        
+
             Admin hbaseAdmin = services.getAdmin();
             hbaseAdmin.flush(TableName.valueOf(tableName));
             hbaseAdmin.majorCompact(TableName.valueOf(tableName));
@@ -910,7 +914,7 @@ public class TestUtil {
                 scan.withStartRow(markerRowKey);
                 scan.withStopRow(Bytes.add(markerRowKey, new byte[]{0}));
                 scan.setRaw(true);
-        
+
                 try (Table htableForRawScan = 
services.getTable(Bytes.toBytes(tableName))) {
                     ResultScanner scanner = htableForRawScan.getScanner(scan);
                     List<Result> results = Lists.newArrayList(scanner);
@@ -1450,4 +1454,71 @@ public class TestUtil {
         return 
Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), 
null);
     }
 
+    /**
+     * Split the table at the provided split point.
+     */
+    public static void splitTable(Connection conn, String tableName, byte[] 
splitPoint)
+            throws Exception {
+        executeHBaseTableRegionOperation(conn, tableName, (admin, 
regionLocator, nRegions) -> {
+            admin.split(TableName.valueOf(tableName), splitPoint);
+            waitForRegionChange(regionLocator, nRegions);
+        });
+    }
+
+    /**
+     * Merge the given regions of a table.
+     */
+    public static void mergeTableRegions(Connection conn, String tableName, 
List<String> regions)
+            throws Exception {
+        byte[][] regionsToMerge = regions.stream()
+                .map(String::getBytes)
+                .toArray(byte[][]::new);
+
+        executeHBaseTableRegionOperation(conn, tableName, (admin, 
regionLocator, nRegions) -> {
+            admin.mergeRegionsAsync(regionsToMerge, true);
+            waitForRegionChange(regionLocator, nRegions);
+        });
+    }
+
+    @FunctionalInterface
+    private interface TableOperation {
+        void execute(Admin admin, RegionLocator regionLocator, int 
initialRegionCount)
+                throws Exception;
+    }
+
+    private static void executeHBaseTableRegionOperation(Connection conn, 
String tableName,
+                                                     TableOperation operation) 
throws Exception {
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Configuration configuration = services.getConfiguration();
+        org.apache.hadoop.hbase.client.Connection hbaseConn
+                = ConnectionFactory.createConnection(configuration);
+        Admin admin = services.getAdmin();
+        RegionLocator regionLocator = 
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+        int nRegions = regionLocator.getAllRegionLocations().size();
+        operation.execute(admin, regionLocator, nRegions);
+
+    }
+
+    private static void waitForRegionChange(RegionLocator regionLocator, int 
initialRegionCount)
+            throws Exception {
+        int retryCount = 0;
+        while (retryCount < 20
+                && regionLocator.getAllRegionLocations().size() == 
initialRegionCount) {
+            Thread.sleep(5000);
+            retryCount++;
+        }
+        Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), 
initialRegionCount);
+    }
+
+    public static List<HRegionLocation> getAllTableRegions(Connection conn, 
String tableName)
+            throws Exception {
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Configuration configuration = services.getConfiguration();
+        RegionLocator regionLocator;
+        org.apache.hadoop.hbase.client.Connection hbaseConn
+                = ConnectionFactory.createConnection(configuration);
+        regionLocator = 
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+        return regionLocator.getAllRegionLocations();
+    }
+
 }


Reply via email to