This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new a51a57a94b PHOENIX-7460 : Update CDC Stream metadata when a data table 
region splits (#2051)
a51a57a94b is described below

commit a51a57a94b431d977ee4106796a3e5222f3cbcd6
Author: Palash Chauhan <[email protected]>
AuthorDate: Mon Jan 6 20:08:51 2025 -0800

    PHOENIX-7460 : Update CDC Stream metadata when a data table region splits 
(#2051)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../phoenix/coprocessor/PhoenixMasterObserver.java | 209 +++++++++++++++++
 .../tasks/CdcStreamPartitionMetadataTask.java      |  15 +-
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 252 ++++++++++++++++++++-
 3 files changed, 467 insertions(+), 9 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
new file mode 100644
index 0000000000..80cfbc718d
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME
+            + " WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, "
+            + "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA,
+                                               final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("{} is not a Phoenix Table, skipping partition 
metadata update.",
+                        regionInfoA.getTable());
+                return;
+            }
+            // find streamName with ENABLED status
+            String tableName = phoenixTable.getName().getString();
+            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                String streamName = rs.getString(1);
+                LOGGER.info("Updating partition metadata for table={}, 
stream={} daughters {} {}",
+                        tableName, streamName, regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName());
+                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata 
during split with daughter regions: {} {}",
+                    regionInfoA.getEncodedName(), 
regionInfoB.getEncodedName(), e);
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
+
+    /**
+     * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
+     * RegionInfoA is left daughter and RegionInfoB is right daughter so 
parent's key range would
+     * be [RegionInfoA stratKey, RegionInfoB endKey]
+     */
+    private String getParentPartitionId(Connection conn, String tableName, 
String streamName,
+                                        RegionInfo regionInfoA, RegionInfo 
regionInfoB)
+            throws SQLException {
+        byte[] parentStartKey = regionInfoA.getStartKey();
+        byte[] parentEndKey = regionInfoB.getEndKey();
+
+        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+        if (parentStartKey.length == 0) {
+            qb.append(" AND PARTITION_START_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_START_KEY = ? ");
+        }
+        if (parentEndKey.length == 0) {
+            qb.append(" AND PARTITION_END_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_END_KEY = ? ");
+        }
+
+        PreparedStatement pstmt = conn.prepareStatement(qb.toString());
+        int index = 1;
+        pstmt.setString(index++, tableName);
+        pstmt.setString(index++, streamName);
+        if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey);
+        if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
+        LOGGER.info("Query to get parent partition id: " + pstmt);
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            return rs.getString(1);
+        } else {
+            throw new SQLException(String.format("Could not find parent of the 
provided daughters: "
+                            + "startKeyA=%s endKeyA=%s startKeyB=%s 
endKeyB=%s",
+                    Bytes.toStringBinary(regionInfoA.getStartKey()),
+                    Bytes.toStringBinary(regionInfoA.getEndKey()),
+                    Bytes.toStringBinary(regionInfoB.getStartKey()),
+                    Bytes.toStringBinary(regionInfoB.getEndKey())));
+        }
+    }
+
+    /**
+     * Insert partition metadata for a daughter region from the split.
+     */
+    private void upsertDaughterPartition(Connection conn, String tableName,
+                                         String streamName, String 
parentPartitionID,
+                                         RegionInfo regionInfo)
+            throws SQLException {
+        String partitionId = regionInfo.getEncodedName();
+        long startTime = regionInfo.getRegionId();
+        byte[] startKey = regionInfo.getStartKey();
+        byte[] endKey = regionInfo.getEndKey();
+        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+        pstmt.setString(1, tableName);
+        pstmt.setString(2, streamName);
+        pstmt.setString(3, partitionId);
+        pstmt.setString(4, parentPartitionID);
+        pstmt.setLong(5, startTime);
+        // endTime in not set when inserting a new partition
+        pstmt.setNull(6, Types.BIGINT);
+        pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+        pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+        pstmt.executeUpdate();
+        conn.commit();
+    }
+
+    /**
+     * Update parent partition's endTime by setting it to daughter's startTime.
+     */
+    private void updateParentPartitionEndTime(Connection conn, String 
tableName,
+                                              String streamName, String 
parentPartitionID,
+                                              long daughterStartTime) throws 
SQLException {
+        PreparedStatement pstmt = 
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
+        pstmt.setString(1, tableName);
+        pstmt.setString(2, streamName);
+        pstmt.setString(3, parentPartitionID);
+        pstmt.setLong(4, daughterStartTime);
+        pstmt.executeUpdate();
+        conn.commit();
+    }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
index c84853481c..de709e8aef 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.List;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
@@ -55,9 +56,8 @@ public class CdcStreamPartitionMetadataTask extends BaseTask  
{
     private static final String CDC_STREAM_STATUS_UPSERT_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, 
?)";
 
-    // parent_partition_id will be null, set partition_end_time to -1
     private static final String CDC_STREAM_PARTITION_UPSERT_SQL
-            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,null,?,-1,?,?)";
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,?,?,?,?,?)";
 
     @Override
     public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
@@ -136,13 +136,16 @@ public class CdcStreamPartitionMetadataTask extends 
BaseTask  {
             throws SQLException {
         try (PreparedStatement ps = 
pconn.prepareStatement(CDC_STREAM_PARTITION_UPSERT_SQL)) {
             for (HRegionLocation tableRegion : tableRegions) {
-                RegionInfo ri = tableRegion.getRegionInfo();
+                // set parent_partition_id, partition_end_time to null
+                RegionInfo ri = tableRegion.getRegion();
                 ps.setString(1, tableName);
                 ps.setString(2, streamName);
                 ps.setString(3, ri.getEncodedName());
-                ps.setLong(4, ri.getRegionId());
-                ps.setBytes(5, ri.getStartKey());
-                ps.setBytes(6, ri.getEndKey());
+                ps.setNull(4, Types.VARCHAR);
+                ps.setLong(5, ri.getRegionId());
+                ps.setNull(6, Types.BIGINT);
+                ps.setBytes(7, ri.getStartKey());
+                ps.setBytes(8, ri.getEndKey());
                 ps.executeUpdate();
             }
             pconn.commit();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 461ee8a7c0..d70821bb94 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,14 +18,22 @@
 
 package org.apache.phoenix.end2end;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -40,6 +48,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -47,9 +56,10 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class CDCStreamIT extends CDCBaseIT {
     private static RegionCoprocessorEnvironment TaskRegionEnvironment;
 
@@ -62,6 +72,7 @@ public class CDCStreamIT extends CDCBaseIT {
                 Long.toString(Long.MAX_VALUE));
         props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
                 Long.toString(Long.MAX_VALUE));
+        props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
         TaskRegionEnvironment =
                 getUtility()
@@ -71,6 +82,7 @@ public class CDCStreamIT extends CDCBaseIT {
                         .get(0).getCoprocessorHost()
                         
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
     }
+
     @Test
     public void testStreamPartitionMetadataBootstrap() throws Exception {
         Connection conn = newConnection();
@@ -149,9 +161,172 @@ public class CDCStreamIT extends CDCBaseIT {
         assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLING);
     }
 
+    /**
+     * Split the only region of the table with empty start key and empty end 
key.
+     */
+    @Test
+    public void testPartitionMetadataTableWithSingleRegionSplits() throws 
Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region somewhere in the middle
+        splitTable(conn, tableName, Bytes.toBytes("m"));
+
+        //check partition metadata - daughter regions are inserted and 
parent's end time is updated.
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+        PartitionMetadata parent = null;
+        List<PartitionMetadata> daughters = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            // parent which was split
+            if (pm.endTime > 0) {
+                parent = pm;
+            } else {
+                daughters.add(pm);
+            }
+        }
+        assertNotNull(parent);
+        assertEquals(2, daughters.size());
+        assertEquals(daughters.get(0).startTime, parent.endTime);
+        assertEquals(daughters.get(1).startTime, parent.endTime);
+        assertEquals(parent.partitionId, daughters.get(0).parentPartitionId);
+        assertEquals(parent.partitionId, daughters.get(1).parentPartitionId);
+        assertTrue(daughters.stream().anyMatch(d -> d.startKey == null && 
d.endKey != null && d.endKey[0] == 'm'));
+        assertTrue(daughters.stream().anyMatch(d -> d.endKey == null && 
d.startKey != null && d.startKey[0] == 'm'));
+    }
+
+    /**
+     * Split the first region of the table with empty start key.
+     */
+    @Test
+    public void testPartitionMetadataFirstRegionSplits() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region [null, null]
+        splitTable(conn, tableName, Bytes.toBytes("l"));
+        // we have 2 regions - [null, l], [l, null], split the first region
+        splitTable(conn, tableName, Bytes.toBytes("d"));
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+        PartitionMetadata grandparent = null, splitParent = null, 
unSplitParent = null;
+        List<PartitionMetadata> daughters = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.endTime > 0) {
+                if (pm.startKey == null && pm.endKey == null) {
+                    grandparent = pm;
+                } else {
+                    splitParent = pm;
+                }
+            } else if (pm.endKey == null) {
+                unSplitParent = pm;
+            } else {
+                daughters.add(pm);
+            }
+        }
+        assertNotNull(grandparent);
+        assertNotNull(unSplitParent);
+        assertNotNull(splitParent);
+        assertEquals(2, daughters.size());
+        assertEquals(daughters.get(0).startTime, splitParent.endTime);
+        assertEquals(daughters.get(1).startTime, splitParent.endTime);
+        assertEquals(splitParent.partitionId, 
daughters.get(0).parentPartitionId);
+        assertEquals(splitParent.partitionId, 
daughters.get(1).parentPartitionId);
+        assertTrue(daughters.stream().anyMatch(d -> d.startKey == null && 
d.endKey != null && d.endKey[0] == 'd'));
+        assertTrue(daughters.stream().anyMatch(d -> d.startKey != null && 
d.startKey[0] == 'd' && d.endKey[0] == 'l'));
+    }
+
+    /**
+     * Split the last region of the table with empty end key.
+     */
+    @Test
+    public void testPartitionMetadataLastRegionSplits() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region [null, null]
+        splitTable(conn, tableName, Bytes.toBytes("l"));
+        // we have 2 regions - [null, l], [l, null], split the second region
+        splitTable(conn, tableName, Bytes.toBytes("q"));
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+        PartitionMetadata grandparent = null, splitParent = null, 
unSplitParent = null;
+        List<PartitionMetadata> daughters = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.endTime > 0) {
+                if (pm.startKey == null && pm.endKey == null) {
+                    grandparent = pm;
+                } else {
+                    splitParent = pm;
+                }
+            } else if (pm.startKey == null) {
+                unSplitParent = pm;
+            } else {
+                daughters.add(pm);
+            }
+        }
+        assertNotNull(grandparent);
+        assertNotNull(unSplitParent);
+        assertNotNull(splitParent);
+        assertEquals(2, daughters.size());
+        assertEquals(daughters.get(0).startTime, splitParent.endTime);
+        assertEquals(daughters.get(1).startTime, splitParent.endTime);
+        assertEquals(splitParent.partitionId, 
daughters.get(0).parentPartitionId);
+        assertEquals(splitParent.partitionId, 
daughters.get(1).parentPartitionId);
+        assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'l' && 
d.endKey[0] == 'q'));
+        assertTrue(daughters.stream().anyMatch(d -> d.endKey == null && 
d.startKey != null && d.startKey[0] == 'q'));
+    }
+
+    /**
+     * Split a middle region of the table with non-empty start/end key.
+     */
+    @Test
+    public void testPartitionMetadataMiddleRegionSplits() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region [null, null]
+        splitTable(conn, tableName, Bytes.toBytes("d"));
+        // we have 2 regions - [null, d], [d, null], split the second region
+        splitTable(conn, tableName, Bytes.toBytes("q"));
+        // we have 3 regions - [null, d], [d, q], [q, null], split the second 
region
+        splitTable(conn, tableName, Bytes.toBytes("j"));
+        // [null, d], [d, j], [j, q], [q, null]
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + 
tableName + "'");
+        PartitionMetadata parent = null;
+        List<PartitionMetadata> daughters = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.startKey != null && pm.endKey != null) {
+                if (pm.endTime > 0) parent = pm;
+                else daughters.add(pm);
+            }
+        }
+        assertNotNull(parent);
+        assertEquals(2, daughters.size());
+        assertEquals(daughters.get(0).startTime, parent.endTime);
+        assertEquals(daughters.get(1).startTime, parent.endTime);
+        assertEquals(parent.partitionId, daughters.get(0).parentPartitionId);
+        assertEquals(parent.partitionId, daughters.get(1).parentPartitionId);
+        assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'd' && 
d.endKey[0] == 'j'));
+        assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'j' && 
d.endKey[0] == 'q'));
+    }
+
     private String getStreamName(Connection conn, String tableName, String 
cdcName) throws SQLException {
         return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, 
CDCUtil.getCDCCreationTimestamp(
-                        
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+                
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
     }
 
     private void assertStreamStatus(Connection conn, String tableName, String 
streamName,
@@ -160,7 +335,7 @@ public class CDCStreamIT extends CDCBaseIT {
                 + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + 
tableName +
                 "' AND STREAM_NAME='" + streamName + "'");
         assertTrue(rs.next());
-        Assert.assertEquals(status.getSerializedValue(), rs.getString(1));
+        assertEquals(status.getSerializedValue(), rs.getString(1));
     }
 
     private void assertPartitionMetadata(Connection conn, String tableName, 
String cdcName)
@@ -180,4 +355,75 @@ public class CDCStreamIT extends CDCBaseIT {
             assertTrue(rs.next());
         }
     }
+
+    private void createTableAndEnableCDC(Connection conn, String tableName) 
throws Exception {
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k VARCHAR PRIMARY KEY," + " 
v1 INTEGER,"
+                        + " v2 VARCHAR)");
+        createCDC(conn, cdc_sql, null);
+        String streamName = getStreamName(conn, tableName, cdcName);
+        TaskRegionObserver.SelfHealingTask task =
+                new TaskRegionObserver.SelfHealingTask(
+                        TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+        task.run();
+        assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLED);
+
+        //upsert sample data
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('a', 1, 'foo')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('b', 2, 'bar')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('e', 3, 'alice')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('j', 4, 'bob')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('m', 5, 'cat')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('p', 6, 'cat')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('t', 7, 'cat')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('z', 8, 'cat')");
+    }
+
+    /**
+     * Split the table at the provided split point.
+     */
+    private void splitTable(Connection conn, String tableName, byte[] 
splitPoint) throws Exception {
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Admin admin = services.getAdmin();
+        Configuration configuration =
+                
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+        org.apache.hadoop.hbase.client.Connection hbaseConn =
+                ConnectionFactory.createConnection(configuration);
+        RegionLocator regionLocator = 
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+        int nRegions = regionLocator.getAllRegionLocations().size();
+        try {
+            admin.split(TableName.valueOf(tableName), splitPoint);
+            int retryCount = 0;
+            do {
+                Thread.sleep(2000);
+                retryCount++;
+            } while (retryCount < 10 && 
regionLocator.getAllRegionLocations().size() == nRegions);
+            
Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), nRegions);
+        } finally {
+            admin.close();
+        }
+    }
+
+    /**
+     * Inner class to represent partition metadata for a region i.e. single 
row from SYSTEM.CDC_STREAM
+     */
+    private class PartitionMetadata {
+        public String partitionId;
+        public String parentPartitionId;
+        public Long startTime;
+        public Long endTime;
+        public byte[] startKey;
+        public byte[] endKey;
+
+        public PartitionMetadata(ResultSet rs) throws SQLException {
+            partitionId = rs.getString(3);
+            parentPartitionId = rs.getString(4);
+            startTime = rs.getLong(5);
+            endTime = rs.getLong(6);
+            startKey = rs.getBytes(7);
+            endKey = rs.getBytes(8);
+        }
+    }
 }

Reply via email to