This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new a51a57a94b PHOENIX-7460 : Update CDC Stream metadata when a data table
region splits (#2051)
a51a57a94b is described below
commit a51a57a94b431d977ee4106796a3e5222f3cbcd6
Author: Palash Chauhan <[email protected]>
AuthorDate: Mon Jan 6 20:08:51 2025 -0800
PHOENIX-7460 : Update CDC Stream metadata when a data table region splits
(#2051)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../phoenix/coprocessor/PhoenixMasterObserver.java | 209 +++++++++++++++++
.../tasks/CdcStreamPartitionMetadataTask.java | 15 +-
.../org/apache/phoenix/end2end/CDCStreamIT.java | 252 ++++++++++++++++++++-
3 files changed, 467 insertions(+), 9 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
new file mode 100644
index 0000000000..80cfbc718d
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+
+/**
+ * Master Coprocessor for Phoenix.
+ */
+public class PhoenixMasterObserver implements MasterObserver,
MasterCoprocessor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixMasterObserver.class);
+
+ private static final String STREAM_STATUS_QUERY
+ = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME
+ + " WHERE TABLE_NAME = ? AND STREAM_STATUS='"
+ + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
+
+ // tableName, streamName, partitionId, parentId, startTime, endTime,
startKey, endKey
+ private static final String PARTITION_UPSERT_SQL
+ = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES
(?,?,?,?,?,?,?,?)";
+
+ private static final String PARENT_PARTITION_QUERY
+ = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+ + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+
+ private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
+ = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME,
STREAM_NAME, PARTITION_ID, "
+ + "PARTITION_END_TIME) VALUES (?,?,?,?)";
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ /**
+ * Update parent -> daughter relationship for CDC Streams.
+ * - find parent partition id using start/end keys of daughters
+ * - upsert partition metadata for the 2 daughters
+ * - update the end time on the parent's partition metadata
+ * @param c the environment to interact with the framework and
master
+ * @param regionInfoA the left daughter region
+ * @param regionInfoB the right daughter region
+ */
+ @Override
+ public void postCompletedSplitRegionAction(final
ObserverContext<MasterCoprocessorEnvironment> c,
+ final RegionInfo regionInfoA,
+ final RegionInfo regionInfoB) {
+ Configuration conf = c.getEnvironment().getConfiguration();
+ try {
+ Connection conn = QueryUtil.getConnectionOnServer(conf);
+ // CDC will be enabled on Phoenix tables only
+ PTable phoenixTable = getPhoenixTable(conn,
regionInfoA.getTable());
+ if (phoenixTable == null) {
+ LOGGER.info("{} is not a Phoenix Table, skipping partition
metadata update.",
+ regionInfoA.getTable());
+ return;
+ }
+ // find streamName with ENABLED status
+ String tableName = phoenixTable.getName().getString();
+ PreparedStatement pstmt =
conn.prepareStatement(STREAM_STATUS_QUERY);
+ pstmt.setString(1, tableName);
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ String streamName = rs.getString(1);
+ LOGGER.info("Updating partition metadata for table={},
stream={} daughters {} {}",
+ tableName, streamName, regionInfoA.getEncodedName(),
regionInfoB.getEncodedName());
+ String parentPartitionID = getParentPartitionId(conn,
tableName, streamName, regionInfoA, regionInfoB);
+ upsertDaughterPartition(conn, tableName, streamName,
parentPartitionID, regionInfoA);
+ upsertDaughterPartition(conn, tableName, streamName,
parentPartitionID, regionInfoB);
+ updateParentPartitionEndTime(conn, tableName, streamName,
parentPartitionID, regionInfoA.getRegionId());
+ }
+ } catch (SQLException e) {
+ LOGGER.error("Unable to update CDC Stream Partition metadata
during split with daughter regions: {} {}",
+ regionInfoA.getEncodedName(),
regionInfoB.getEncodedName(), e);
+ }
+ }
+
+ private PTable getPhoenixTable(Connection conn, TableName tableName)
throws SQLException {
+ PTable pTable;
+ try {
+ pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+ } catch (TableNotFoundException e) {
+ return null;
+ }
+ return pTable;
+ }
+
+ /**
+ * Lookup parent's partition id (region's encoded name) in
SYSTEM.CDC_STREAM.
+ * RegionInfoA is left daughter and RegionInfoB is right daughter so
parent's key range would
+ * be [RegionInfoA stratKey, RegionInfoB endKey]
+ */
+ private String getParentPartitionId(Connection conn, String tableName,
String streamName,
+ RegionInfo regionInfoA, RegionInfo
regionInfoB)
+ throws SQLException {
+ byte[] parentStartKey = regionInfoA.getStartKey();
+ byte[] parentEndKey = regionInfoB.getEndKey();
+
+ StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+ if (parentStartKey.length == 0) {
+ qb.append(" AND PARTITION_START_KEY IS NULL ");
+ } else {
+ qb.append(" AND PARTITION_START_KEY = ? ");
+ }
+ if (parentEndKey.length == 0) {
+ qb.append(" AND PARTITION_END_KEY IS NULL ");
+ } else {
+ qb.append(" AND PARTITION_END_KEY = ? ");
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(qb.toString());
+ int index = 1;
+ pstmt.setString(index++, tableName);
+ pstmt.setString(index++, streamName);
+ if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey);
+ if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
+ LOGGER.info("Query to get parent partition id: " + pstmt);
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ return rs.getString(1);
+ } else {
+ throw new SQLException(String.format("Could not find parent of the
provided daughters: "
+ + "startKeyA=%s endKeyA=%s startKeyB=%s
endKeyB=%s",
+ Bytes.toStringBinary(regionInfoA.getStartKey()),
+ Bytes.toStringBinary(regionInfoA.getEndKey()),
+ Bytes.toStringBinary(regionInfoB.getStartKey()),
+ Bytes.toStringBinary(regionInfoB.getEndKey())));
+ }
+ }
+
+ /**
+ * Insert partition metadata for a daughter region from the split.
+ */
+ private void upsertDaughterPartition(Connection conn, String tableName,
+ String streamName, String
parentPartitionID,
+ RegionInfo regionInfo)
+ throws SQLException {
+ String partitionId = regionInfo.getEncodedName();
+ long startTime = regionInfo.getRegionId();
+ byte[] startKey = regionInfo.getStartKey();
+ byte[] endKey = regionInfo.getEndKey();
+ PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, partitionId);
+ pstmt.setString(4, parentPartitionID);
+ pstmt.setLong(5, startTime);
+ // endTime in not set when inserting a new partition
+ pstmt.setNull(6, Types.BIGINT);
+ pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+ pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+ pstmt.executeUpdate();
+ conn.commit();
+ }
+
+ /**
+ * Update parent partition's endTime by setting it to daughter's startTime.
+ */
+ private void updateParentPartitionEndTime(Connection conn, String
tableName,
+ String streamName, String
parentPartitionID,
+ long daughterStartTime) throws
SQLException {
+ PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, parentPartitionID);
+ pstmt.setLong(4, daughterStartTime);
+ pstmt.executeUpdate();
+ conn.commit();
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
index c84853481c..de709e8aef 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
@@ -37,6 +37,7 @@ import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.List;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
@@ -55,9 +56,8 @@ public class CdcStreamPartitionMetadataTask extends BaseTask
{
private static final String CDC_STREAM_STATUS_UPSERT_SQL
= "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?,
?)";
- // parent_partition_id will be null, set partition_end_time to -1
private static final String CDC_STREAM_PARTITION_UPSERT_SQL
- = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES
(?,?,?,null,?,-1,?,?)";
+ = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES
(?,?,?,?,?,?,?,?)";
@Override
public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
@@ -136,13 +136,16 @@ public class CdcStreamPartitionMetadataTask extends
BaseTask {
throws SQLException {
try (PreparedStatement ps =
pconn.prepareStatement(CDC_STREAM_PARTITION_UPSERT_SQL)) {
for (HRegionLocation tableRegion : tableRegions) {
- RegionInfo ri = tableRegion.getRegionInfo();
+ // set parent_partition_id, partition_end_time to null
+ RegionInfo ri = tableRegion.getRegion();
ps.setString(1, tableName);
ps.setString(2, streamName);
ps.setString(3, ri.getEncodedName());
- ps.setLong(4, ri.getRegionId());
- ps.setBytes(5, ri.getStartKey());
- ps.setBytes(6, ri.getEndKey());
+ ps.setNull(4, Types.VARCHAR);
+ ps.setLong(5, ri.getRegionId());
+ ps.setNull(6, Types.BIGINT);
+ ps.setBytes(7, ri.getStartKey());
+ ps.setBytes(8, ri.getEndKey());
ps.executeUpdate();
}
pconn.commit();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 461ee8a7c0..d70821bb94 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,14 +18,22 @@
package org.apache.phoenix.end2end;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -40,6 +48,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -47,9 +56,10 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
public class CDCStreamIT extends CDCBaseIT {
private static RegionCoprocessorEnvironment TaskRegionEnvironment;
@@ -62,6 +72,7 @@ public class CDCStreamIT extends CDCBaseIT {
Long.toString(Long.MAX_VALUE));
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
TaskRegionEnvironment =
getUtility()
@@ -71,6 +82,7 @@ public class CDCStreamIT extends CDCBaseIT {
.get(0).getCoprocessorHost()
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
}
+
@Test
public void testStreamPartitionMetadataBootstrap() throws Exception {
Connection conn = newConnection();
@@ -149,9 +161,172 @@ public class CDCStreamIT extends CDCBaseIT {
assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLING);
}
+ /**
+ * Split the only region of the table with empty start key and empty end
key.
+ */
+ @Test
+ public void testPartitionMetadataTableWithSingleRegionSplits() throws
Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ //split the only region somewhere in the middle
+ splitTable(conn, tableName, Bytes.toBytes("m"));
+
+ //check partition metadata - daughter regions are inserted and
parent's end time is updated.
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+ PartitionMetadata parent = null;
+ List<PartitionMetadata> daughters = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ // parent which was split
+ if (pm.endTime > 0) {
+ parent = pm;
+ } else {
+ daughters.add(pm);
+ }
+ }
+ assertNotNull(parent);
+ assertEquals(2, daughters.size());
+ assertEquals(daughters.get(0).startTime, parent.endTime);
+ assertEquals(daughters.get(1).startTime, parent.endTime);
+ assertEquals(parent.partitionId, daughters.get(0).parentPartitionId);
+ assertEquals(parent.partitionId, daughters.get(1).parentPartitionId);
+ assertTrue(daughters.stream().anyMatch(d -> d.startKey == null &&
d.endKey != null && d.endKey[0] == 'm'));
+ assertTrue(daughters.stream().anyMatch(d -> d.endKey == null &&
d.startKey != null && d.startKey[0] == 'm'));
+ }
+
+ /**
+ * Split the first region of the table with empty start key.
+ */
+ @Test
+ public void testPartitionMetadataFirstRegionSplits() throws Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ //split the only region [null, null]
+ splitTable(conn, tableName, Bytes.toBytes("l"));
+ // we have 2 regions - [null, l], [l, null], split the first region
+ splitTable(conn, tableName, Bytes.toBytes("d"));
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+ PartitionMetadata grandparent = null, splitParent = null,
unSplitParent = null;
+ List<PartitionMetadata> daughters = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.endTime > 0) {
+ if (pm.startKey == null && pm.endKey == null) {
+ grandparent = pm;
+ } else {
+ splitParent = pm;
+ }
+ } else if (pm.endKey == null) {
+ unSplitParent = pm;
+ } else {
+ daughters.add(pm);
+ }
+ }
+ assertNotNull(grandparent);
+ assertNotNull(unSplitParent);
+ assertNotNull(splitParent);
+ assertEquals(2, daughters.size());
+ assertEquals(daughters.get(0).startTime, splitParent.endTime);
+ assertEquals(daughters.get(1).startTime, splitParent.endTime);
+ assertEquals(splitParent.partitionId,
daughters.get(0).parentPartitionId);
+ assertEquals(splitParent.partitionId,
daughters.get(1).parentPartitionId);
+ assertTrue(daughters.stream().anyMatch(d -> d.startKey == null &&
d.endKey != null && d.endKey[0] == 'd'));
+ assertTrue(daughters.stream().anyMatch(d -> d.startKey != null &&
d.startKey[0] == 'd' && d.endKey[0] == 'l'));
+ }
+
+ /**
+ * Split the last region of the table with empty end key.
+ */
+ @Test
+ public void testPartitionMetadataLastRegionSplits() throws Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ //split the only region [null, null]
+ splitTable(conn, tableName, Bytes.toBytes("l"));
+ // we have 2 regions - [null, l], [l, null], split the second region
+ splitTable(conn, tableName, Bytes.toBytes("q"));
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+ PartitionMetadata grandparent = null, splitParent = null,
unSplitParent = null;
+ List<PartitionMetadata> daughters = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.endTime > 0) {
+ if (pm.startKey == null && pm.endKey == null) {
+ grandparent = pm;
+ } else {
+ splitParent = pm;
+ }
+ } else if (pm.startKey == null) {
+ unSplitParent = pm;
+ } else {
+ daughters.add(pm);
+ }
+ }
+ assertNotNull(grandparent);
+ assertNotNull(unSplitParent);
+ assertNotNull(splitParent);
+ assertEquals(2, daughters.size());
+ assertEquals(daughters.get(0).startTime, splitParent.endTime);
+ assertEquals(daughters.get(1).startTime, splitParent.endTime);
+ assertEquals(splitParent.partitionId,
daughters.get(0).parentPartitionId);
+ assertEquals(splitParent.partitionId,
daughters.get(1).parentPartitionId);
+ assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'l' &&
d.endKey[0] == 'q'));
+ assertTrue(daughters.stream().anyMatch(d -> d.endKey == null &&
d.startKey != null && d.startKey[0] == 'q'));
+ }
+
+ /**
+ * Split a middle region of the table with non-empty start/end key.
+ */
+ @Test
+ public void testPartitionMetadataMiddleRegionSplits() throws Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ //split the only region [null, null]
+ splitTable(conn, tableName, Bytes.toBytes("d"));
+ // we have 2 regions - [null, d], [d, null], split the second region
+ splitTable(conn, tableName, Bytes.toBytes("q"));
+ // we have 3 regions - [null, d], [d, q], [q, null], split the second
region
+ splitTable(conn, tableName, Bytes.toBytes("j"));
+ // [null, d], [d, j], [j, q], [q, null]
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+ PartitionMetadata parent = null;
+ List<PartitionMetadata> daughters = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.startKey != null && pm.endKey != null) {
+ if (pm.endTime > 0) parent = pm;
+ else daughters.add(pm);
+ }
+ }
+ assertNotNull(parent);
+ assertEquals(2, daughters.size());
+ assertEquals(daughters.get(0).startTime, parent.endTime);
+ assertEquals(daughters.get(1).startTime, parent.endTime);
+ assertEquals(parent.partitionId, daughters.get(0).parentPartitionId);
+ assertEquals(parent.partitionId, daughters.get(1).parentPartitionId);
+ assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'd' &&
d.endKey[0] == 'j'));
+ assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'j' &&
d.endKey[0] == 'q'));
+ }
+
private String getStreamName(Connection conn, String tableName, String
cdcName) throws SQLException {
return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
CDCUtil.getCDCCreationTimestamp(
-
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
}
private void assertStreamStatus(Connection conn, String tableName, String
streamName,
@@ -160,7 +335,7 @@ public class CDCStreamIT extends CDCBaseIT {
+ SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" +
tableName +
"' AND STREAM_NAME='" + streamName + "'");
assertTrue(rs.next());
- Assert.assertEquals(status.getSerializedValue(), rs.getString(1));
+ assertEquals(status.getSerializedValue(), rs.getString(1));
}
private void assertPartitionMetadata(Connection conn, String tableName,
String cdcName)
@@ -180,4 +355,75 @@ public class CDCStreamIT extends CDCBaseIT {
assertTrue(rs.next());
}
}
+
+ private void createTableAndEnableCDC(Connection conn, String tableName)
throws Exception {
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k VARCHAR PRIMARY KEY," + "
v1 INTEGER,"
+ + " v2 VARCHAR)");
+ createCDC(conn, cdc_sql, null);
+ String streamName = getStreamName(conn, tableName, cdcName);
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ task.run();
+ assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLED);
+
+ //upsert sample data
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a', 1, 'foo')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('b', 2, 'bar')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('e', 3, 'alice')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('j', 4, 'bob')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('m', 5, 'cat')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('p', 6, 'cat')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('t', 7, 'cat')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('z', 8, 'cat')");
+ }
+
+ /**
+ * Split the table at the provided split point.
+ */
+ private void splitTable(Connection conn, String tableName, byte[]
splitPoint) throws Exception {
+ ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = services.getAdmin();
+ Configuration configuration =
+
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+ org.apache.hadoop.hbase.client.Connection hbaseConn =
+ ConnectionFactory.createConnection(configuration);
+ RegionLocator regionLocator =
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+ int nRegions = regionLocator.getAllRegionLocations().size();
+ try {
+ admin.split(TableName.valueOf(tableName), splitPoint);
+ int retryCount = 0;
+ do {
+ Thread.sleep(2000);
+ retryCount++;
+ } while (retryCount < 10 &&
regionLocator.getAllRegionLocations().size() == nRegions);
+
Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), nRegions);
+ } finally {
+ admin.close();
+ }
+ }
+
+ /**
+ * Inner class to represent partition metadata for a region i.e. single
row from SYSTEM.CDC_STREAM
+ */
+ private class PartitionMetadata {
+ public String partitionId;
+ public String parentPartitionId;
+ public Long startTime;
+ public Long endTime;
+ public byte[] startKey;
+ public byte[] endKey;
+
+ public PartitionMetadata(ResultSet rs) throws SQLException {
+ partitionId = rs.getString(3);
+ parentPartitionId = rs.getString(4);
+ startTime = rs.getLong(5);
+ endTime = rs.getLong(6);
+ startKey = rs.getBytes(7);
+ endKey = rs.getBytes(8);
+ }
+ }
}