virajjasani commented on code in PR #2051:
URL: https://github.com/apache/phoenix/pull/2051#discussion_r1902201356


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,187 @@
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME +
+            "WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + "VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME +
+            " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO" + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, " +
+            "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA, 
final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("Not a Phoenix Table, skipping partition metadata 
update.");
+                return;
+            }
+            // find streamName with ENABLED status
+            String tableName = phoenixTable.getName().getString();
+            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                String streamName = rs.getString(1);
+                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata: " + 
e);
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
+
+    /**
+     * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
+     * We infer start and end key for the parent from the two daughters' 
start/end keys.
+     */
+    private String getParentPartitionId(Connection conn, String tableName, 
String streamName,
+                                        RegionInfo regionInfoA, RegionInfo 
regionInfoB)
+            throws SQLException {
+        byte[] inferredParentStartKey =
+                Bytes.compareTo(regionInfoA.getStartKey(), 
regionInfoB.getStartKey()) < 0 ?
+                regionInfoA.getStartKey() : regionInfoB.getStartKey();
+
+        byte[] inferredParentEndKey = (regionInfoB.getEndKey().length == 0 || 
regionInfoA.getEndKey().length == 0)
+                ? new byte[0]
+                : Bytes.compareTo(regionInfoA.getEndKey(), 
regionInfoB.getEndKey()) < 0
+                    ? regionInfoB.getEndKey()
+                    : regionInfoA.getEndKey();
+
+        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+        if (inferredParentStartKey.length == 0) {
+            qb.append(" AND PARTITION_START_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_START_KEY = ? ");
+        }
+        if (inferredParentEndKey.length == 0) {
+            qb.append(" AND PARTITION_END_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_END_KEY = ? ");
+        }
+
+        PreparedStatement pstmt = conn.prepareStatement(qb.toString());
+        int index = 1;
+        pstmt.setString(index++, tableName);
+        pstmt.setString(index++, streamName);
+        if (inferredParentStartKey.length > 0) pstmt.setBytes(index++, 
inferredParentStartKey);
+        if (inferredParentEndKey.length > 0) pstmt.setBytes(index++, 
inferredParentEndKey);
+        LOGGER.info(pstmt.toString());

Review Comment:
   nit: let's add some string message to help debug



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,187 @@
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME +
+            "WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + "VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME +
+            " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO" + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, " +
+            "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA, 
final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("Not a Phoenix Table, skipping partition metadata 
update.");
+                return;
+            }
+            // find streamName with ENABLED status
+            String tableName = phoenixTable.getName().getString();
+            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                String streamName = rs.getString(1);
+                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata: " + 
e);
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
+
+    /**
+     * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
+     * We infer start and end key for the parent from the two daughters' 
start/end keys.
+     */
+    private String getParentPartitionId(Connection conn, String tableName, 
String streamName,
+                                        RegionInfo regionInfoA, RegionInfo 
regionInfoB)
+            throws SQLException {
+        byte[] inferredParentStartKey =
+                Bytes.compareTo(regionInfoA.getStartKey(), 
regionInfoB.getStartKey()) < 0 ?
+                regionInfoA.getStartKey() : regionInfoB.getStartKey();
+
+        byte[] inferredParentEndKey = (regionInfoB.getEndKey().length == 0 || 
regionInfoA.getEndKey().length == 0)
+                ? new byte[0]
+                : Bytes.compareTo(regionInfoA.getEndKey(), 
regionInfoB.getEndKey()) < 0
+                    ? regionInfoB.getEndKey()
+                    : regionInfoA.getEndKey();
+
+        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+        if (inferredParentStartKey.length == 0) {
+            qb.append(" AND PARTITION_START_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_START_KEY = ? ");
+        }
+        if (inferredParentEndKey.length == 0) {
+            qb.append(" AND PARTITION_END_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_END_KEY = ? ");
+        }
+
+        PreparedStatement pstmt = conn.prepareStatement(qb.toString());
+        int index = 1;
+        pstmt.setString(index++, tableName);
+        pstmt.setString(index++, streamName);
+        if (inferredParentStartKey.length > 0) pstmt.setBytes(index++, 
inferredParentStartKey);
+        if (inferredParentEndKey.length > 0) pstmt.setBytes(index++, 
inferredParentEndKey);
+        LOGGER.info(pstmt.toString());
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            return rs.getString(1);
+        } else {
+            throw new SQLException("Could not find parent of the provided 
daughters.");
+        }
+    }
+
+    /**
+     * Insert partition metadata for a daughter region from the split.
+     */
+    private void upsertDaughterPartition(Connection conn, String tableName,
+                                         String streamName, String 
parentPartitionID,
+                                         RegionInfo regionInfo)
+            throws SQLException {
+        String partitionId = regionInfo.getEncodedName();
+        long startTime = regionInfo.getRegionId();
+        byte[] startKey = regionInfo.getStartKey();
+        byte[] endKey = regionInfo.getEndKey();
+        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+        pstmt.setString(1, tableName);
+        pstmt.setString(2, streamName);
+        pstmt.setString(3, partitionId);
+        pstmt.setString(4, parentPartitionID);
+        pstmt.setLong(5, startTime);
+        // endTime in not set when inserting a new partition
+        pstmt.setLong(6, -1L);

Review Comment:
   Shall we keep it as `pstmt.setNull(6, Types.BIGINT)`?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,187 @@
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME +
+            "WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + "VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME +
+            " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO" + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, " +
+            "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA, 
final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("Not a Phoenix Table, skipping partition metadata 
update.");
+                return;
+            }
+            // find streamName with ENABLED status
+            String tableName = phoenixTable.getName().getString();
+            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                String streamName = rs.getString(1);
+                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata: " + 
e);
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
+
+    /**
+     * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
+     * We infer start and end key for the parent from the two daughters' 
start/end keys.
+     */
+    private String getParentPartitionId(Connection conn, String tableName, 
String streamName,
+                                        RegionInfo regionInfoA, RegionInfo 
regionInfoB)
+            throws SQLException {
+        byte[] inferredParentStartKey =
+                Bytes.compareTo(regionInfoA.getStartKey(), 
regionInfoB.getStartKey()) < 0 ?
+                regionInfoA.getStartKey() : regionInfoB.getStartKey();
+
+        byte[] inferredParentEndKey = (regionInfoB.getEndKey().length == 0 || 
regionInfoA.getEndKey().length == 0)
+                ? new byte[0]
+                : Bytes.compareTo(regionInfoA.getEndKey(), 
regionInfoB.getEndKey()) < 0
+                    ? regionInfoB.getEndKey()
+                    : regionInfoA.getEndKey();
+
+        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+        if (inferredParentStartKey.length == 0) {
+            qb.append(" AND PARTITION_START_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_START_KEY = ? ");
+        }
+        if (inferredParentEndKey.length == 0) {
+            qb.append(" AND PARTITION_END_KEY IS NULL ");
+        } else {
+            qb.append(" AND PARTITION_END_KEY = ? ");
+        }
+
+        PreparedStatement pstmt = conn.prepareStatement(qb.toString());
+        int index = 1;
+        pstmt.setString(index++, tableName);
+        pstmt.setString(index++, streamName);
+        if (inferredParentStartKey.length > 0) pstmt.setBytes(index++, 
inferredParentStartKey);
+        if (inferredParentEndKey.length > 0) pstmt.setBytes(index++, 
inferredParentEndKey);
+        LOGGER.info(pstmt.toString());
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            return rs.getString(1);
+        } else {
+            throw new SQLException("Could not find parent of the provided 
daughters.");

Review Comment:
   Add regionInfoA and regionInfoB start/end keys in the Exception message?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,187 @@
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME +
+            "WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + "VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME +
+            " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO" + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, " +
+            "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA, 
final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("Not a Phoenix Table, skipping partition metadata 
update.");
+                return;
+            }
+            // find streamName with ENABLED status
+            String tableName = phoenixTable.getName().getString();
+            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                String streamName = rs.getString(1);
+                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata: " + 
e);
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) 
throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
+
+    /**
+     * Lookup parent's partition id (region's encoded name) in 
SYSTEM.CDC_STREAM.
+     * We infer start and end key for the parent from the two daughters' 
start/end keys.
+     */
+    private String getParentPartitionId(Connection conn, String tableName, 
String streamName,
+                                        RegionInfo regionInfoA, RegionInfo 
regionInfoB)
+            throws SQLException {
+        byte[] inferredParentStartKey =
+                Bytes.compareTo(regionInfoA.getStartKey(), 
regionInfoB.getStartKey()) < 0 ?
+                regionInfoA.getStartKey() : regionInfoB.getStartKey();
+
+        byte[] inferredParentEndKey = (regionInfoB.getEndKey().length == 0 || 
regionInfoA.getEndKey().length == 0)

Review Comment:
   `regionInfoA` endkey should not be empty? or is it possible that regionInfoA 
and regionInfoB could be swapped?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,187 @@
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME +
+            "WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + "VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME +
+            " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO" + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, " +
+            "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA, 
final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("Not a Phoenix Table, skipping partition metadata 
update.");
+                return;
+            }
+            // find streamName with ENABLED status
+            String tableName = phoenixTable.getName().getString();
+            PreparedStatement pstmt = 
conn.prepareStatement(STREAM_STATUS_QUERY);
+            pstmt.setString(1, tableName);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.next()) {
+                String streamName = rs.getString(1);
+                String parentPartitionID = getParentPartitionId(conn, 
tableName, streamName, regionInfoA, regionInfoB);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoA);
+                upsertDaughterPartition(conn, tableName, streamName, 
parentPartitionID, regionInfoB);
+                updateParentPartitionEndTime(conn, tableName, streamName, 
parentPartitionID, regionInfoA.getRegionId());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata: " + 
e);

Review Comment:
   Additional log message to include regionA and regionB would be great for 
debugging



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java:
##########
@@ -0,0 +1,187 @@
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver, 
MasterCoprocessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+    private static final String STREAM_STATUS_QUERY
+            = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME +
+            "WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+            + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+    // tableName, streamName, partitionId, parentId, startTime, endTime, 
startKey, endKey
+    private static final String PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + "VALUES 
(?,?,?,?,?,?,?,?)";
+
+    private static final String PARENT_PARTITION_QUERY
+            = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME +
+            " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+            = "UPSERT INTO" + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, 
STREAM_NAME, PARTITION_ID, " +
+            "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+        return Optional.of(this);
+    }
+
+    /**
+     * Update parent -> daughter relationship for CDC Streams.
+     * - find parent partition id using start/end keys of daughters
+     * - upsert partition metadata for the 2 daughters
+     * - update the end time on the parent's partition metadata
+     * @param c           the environment to interact with the framework and 
master
+     * @param regionInfoA the left daughter region
+     * @param regionInfoB the right daughter region
+     */
+    @Override
+    public void postCompletedSplitRegionAction(final 
ObserverContext<MasterCoprocessorEnvironment> c,
+                                               final RegionInfo regionInfoA, 
final RegionInfo regionInfoB) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            Connection conn  = QueryUtil.getConnectionOnServer(conf);
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, 
regionInfoA.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("Not a Phoenix Table, skipping partition metadata 
update.");

Review Comment:
   log `phoenixTable` with the message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to