This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 30801d9a65 PHOENIX-7499 : Update stream metadata when data table
regions merge (#2057)
30801d9a65 is described below
commit 30801d9a652fc44a1444af1b54c5ed11d0153fc6
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Feb 6 14:20:04 2025 -0800
PHOENIX-7499 : Update stream metadata when data table regions merge (#2057)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 +
.../org/apache/phoenix/query/QueryConstants.java | 2 +
.../phoenix/coprocessor/PhoenixMasterObserver.java | 219 ++++++++++++++++-----
.../org/apache/phoenix/end2end/CDCStreamIT.java | 206 +++++++++++++++----
.../java/org/apache/phoenix/util/TestUtil.java | 83 +++++++-
5 files changed, 412 insertions(+), 99 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index ff250f40ba..1a4b2996b7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -461,6 +461,7 @@ public class PhoenixDatabaseMetaData implements
DatabaseMetaData {
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA,
SYSTEM_CDC_STREAM_TABLE);
public static final String STREAM_NAME = "STREAM_NAME";
public static final String STREAM_STATUS = "STREAM_STATUS";
+ public static final String STREAM_TYPE = "STREAM_TYPE";
public static final String PARTITION_ID = "PARTITION_ID";
public static final String PARENT_PARTITION_ID = "PARENT_PARTITION_ID";
public static final String PARTITION_START_TIME = "PARTITION_START_TIME";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index d59249fb58..02589a9265 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -142,6 +142,7 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_TYPE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
@@ -658,6 +659,7 @@ public interface QueryConstants {
STREAM_NAME + " VARCHAR NOT NULL," +
// Non-PK columns
STREAM_STATUS + " VARCHAR,\n" +
+ STREAM_TYPE + " VARCHAR,\n" +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
TABLE_NAME + "," + STREAM_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index f51cf7c154..e335d7604a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -23,8 +23,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@@ -60,9 +62,13 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
private static final String PARTITION_UPSERT_SQL
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES
(?,?,?,?,?,?,?,?)";
- private static final String PARENT_PARTITION_QUERY
+ private static final String PARENT_PARTITION_QUERY_FOR_SPLIT
= "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " +
SYSTEM_CDC_STREAM_NAME
- + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+ + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND
PARTITION_END_TIME IS NULL ";
+
+ private static final String PARENT_PARTITION_QUERY_FOR_MERGE
+ = "SELECT PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+ + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID = ?";
private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME,
STREAM_NAME, PARTITION_ID, "
@@ -74,7 +80,7 @@ public class PhoenixMasterObserver implements MasterObserver,
MasterCoprocessor
}
/**
- * Update parent -> daughter relationship for CDC Streams.
+ * Update parent -> daughter relationship for CDC Streams when a region
splits.
* - find parent partition id using start/end keys of daughters
* - upsert partition metadata for the 2 daughters
* - update the end time on the parent's partition metadata
@@ -95,20 +101,23 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
regionInfoA.getTable());
return;
}
- // find streamName with ENABLED status
String tableName = phoenixTable.getName().getString();
- PreparedStatement pstmt =
conn.prepareStatement(STREAM_STATUS_QUERY);
- pstmt.setString(1, tableName);
- ResultSet rs = pstmt.executeQuery();
- if (rs.next()) {
- String streamName = rs.getString(1);
- LOGGER.info("Updating partition metadata for table={},
stream={} daughters {} {}",
+ String streamName = getStreamName(conn, tableName);
+ if (streamName != null) {
+ LOGGER.info("Updating split partition metadata for table={},
stream={} daughters {} {}",
tableName, streamName, regionInfoA.getEncodedName(),
regionInfoB.getEncodedName());
- // ancestorIDs = [parentId, grandparentId1]
- List<String> ancestorIDs = getAncestorIds(conn, tableName,
streamName, regionInfoA, regionInfoB);
- upsertDaughterPartition(conn, tableName, streamName,
ancestorIDs.get(0), regionInfoA);
- upsertDaughterPartition(conn, tableName, streamName,
ancestorIDs.get(0), regionInfoB);
- updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs, regionInfoA.getRegionId());
+ // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
+ List<String> ancestorIDs
+ = getAncestorIdsForSplit(conn, tableName, streamName,
regionInfoA, regionInfoB);
+
+ upsertDaughterPartitions(conn, tableName, streamName,
ancestorIDs.subList(0, 1),
+ Arrays.asList(regionInfoA, regionInfoB));
+
+ updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
+ regionInfoA.getRegionId());
+ } else {
+ LOGGER.info("{} does not have a stream enabled, skipping
partition metadata update.",
+ regionInfoA.getTable());
}
} catch (SQLException e) {
LOGGER.error("Unable to update CDC Stream Partition metadata
during split with daughter regions: {} {}",
@@ -116,31 +125,68 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
}
}
- private PTable getPhoenixTable(Connection conn, TableName tableName)
throws SQLException {
- PTable pTable;
- try {
- pTable = PhoenixRuntime.getTable(conn, tableName.toString());
- } catch (TableNotFoundException e) {
- return null;
+ /**
+ * Update parent -> daughter relationship for CDC Streams when regions
merge.
+ * - upsert partition metadata for the daughter with each parent
+ * - update the end time on all the parents' partition metadata
+ * @param c the environment to interact with the framework and master
+ * @param regionsToMerge parent regions which merged
+ * @param mergedRegion daughter region
+ */
+ @Override
+ public void postCompletedMergeRegionsAction(final
ObserverContext<MasterCoprocessorEnvironment> c,
+ final RegionInfo[]
regionsToMerge,
+ final RegionInfo mergedRegion)
{
+ Configuration conf = c.getEnvironment().getConfiguration();
+ try (Connection conn = QueryUtil.getConnectionOnServer(conf)) {
+ // CDC will be enabled on Phoenix tables only
+ PTable phoenixTable = getPhoenixTable(conn,
mergedRegion.getTable());
+ if (phoenixTable == null) {
+ LOGGER.info("{} is not a Phoenix Table, skipping partition
metadata update.",
+ mergedRegion.getTable());
+ return;
+ }
+ String tableName = phoenixTable.getName().getString();
+ String streamName = getStreamName(conn, tableName);
+ if (streamName != null) {
+ LOGGER.info("Updating merged partition metadata for table={},
stream={} daughter {}",
+ tableName, streamName, mergedRegion.getEncodedName());
+ // upsert a row for daughter-parent for each merged region
+ upsertDaughterPartitions(conn, tableName, streamName,
+
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
+ Arrays.asList(mergedRegion));
+
+ // lookup all ancestors of a merged region and update the
endTime
+ for (RegionInfo ri : regionsToMerge) {
+ List<String> ancestorIDs = getAncestorIdsForMerge(conn,
tableName, streamName, ri);
+ updateParentPartitionEndTime(conn, tableName, streamName,
ancestorIDs,
+ mergedRegion.getRegionId());
+ }
+ } else {
+ LOGGER.info("{} does not have a stream enabled, skipping
partition metadata update.",
+ mergedRegion.getTable());
+ }
+ } catch (SQLException e) {
+ LOGGER.error("Unable to update CDC Stream Partition metadata
during merge with " +
+ "parent regions: {} and daughter region {}",
+ regionsToMerge, mergedRegion.getEncodedName(), e);
}
- return pTable;
}
/**
- * Lookup parent's partition id (region's encoded name) in
SYSTEM.CDC_STREAM.
+ * Lookup a split parent's partition id (region's encoded name) in
SYSTEM.CDC_STREAM.
* RegionInfoA is left daughter and RegionInfoB is right daughter so
parent's key range would
* be [RegionInfoA startKey, RegionInfoB endKey]
- * Return both parent and grandparent partition ids.
+ * Return parent and all grandparent partition ids.
*
- * TODO: When we implement merges in this coproc, there could be multiple
grandparents.
*/
- private List<String> getAncestorIds(Connection conn, String tableName,
String streamName,
+ private List<String> getAncestorIdsForSplit(Connection conn, String
tableName, String streamName,
RegionInfo regionInfoA, RegionInfo
regionInfoB)
throws SQLException {
byte[] parentStartKey = regionInfoA.getStartKey();
byte[] parentEndKey = regionInfoB.getEndKey();
- StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+ StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY_FOR_SPLIT);
if (parentStartKey.length == 0) {
qb.append(" AND PARTITION_START_KEY IS NULL ");
} else {
@@ -173,50 +219,115 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
Bytes.toStringBinary(regionInfoB.getStartKey()),
Bytes.toStringBinary(regionInfoB.getEndKey())));
}
+ // if parent was a result of a merge, there will be multiple
grandparents.
+ while (rs.next()) {
+ ancestorIDs.add(rs.getString(2));
+ }
return ancestorIDs;
}
/**
- * Insert partition metadata for a daughter region from the split.
+ * Lookup the parent of a merged region.
+ * If the merged region was an output of a merge in the past, it will have
multiple parents.
*/
- private void upsertDaughterPartition(Connection conn, String tableName,
- String streamName, String
parentPartitionID,
- RegionInfo regionInfo)
- throws SQLException {
- String partitionId = regionInfo.getEncodedName();
- long startTime = regionInfo.getRegionId();
- byte[] startKey = regionInfo.getStartKey();
- byte[] endKey = regionInfo.getEndKey();
- PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+ private List<String> getAncestorIdsForMerge(Connection conn, String
tableName, String streamName,
+ RegionInfo parent) throws
SQLException {
+ List<String> ancestorIDs = new ArrayList<>();
+ ancestorIDs.add(parent.getEncodedName());
+ PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
pstmt.setString(1, tableName);
pstmt.setString(2, streamName);
- pstmt.setString(3, partitionId);
- pstmt.setString(4, parentPartitionID);
- pstmt.setLong(5, startTime);
- // endTime in not set when inserting a new partition
- pstmt.setNull(6, Types.BIGINT);
- pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
- pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
- pstmt.executeUpdate();
+ pstmt.setString(3, parent.getEncodedName());
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ ancestorIDs.add(rs.getString(1));
+ } else {
+ throw new SQLException(String.format(
+ "Could not find parent of the provided merged region: {}",
parent.getEncodedName()));
+ }
+ // if parent was a result of a merge, there will be multiple
grandparents.
+ while (rs.next()) {
+ ancestorIDs.add(rs.getString(1));
+ }
+ return ancestorIDs;
+ }
+
+ /**
+ * Insert partition metadata for a daughter region from a split or a merge.
+ * split: 2 daughters, 1 parent
+ * merge: 1 daughter, N parents
+ */
+ private void upsertDaughterPartitions(Connection conn, String tableName,
+ String streamName, List<String>
parentPartitionIDs,
+ List<RegionInfo> daughters)
+ throws SQLException {
+ conn.setAutoCommit(false);
+ PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+ for (RegionInfo daughter : daughters) {
+ for (String parentPartitionID : parentPartitionIDs) {
+ String partitionId = daughter.getEncodedName();
+ long startTime = daughter.getRegionId();
+ byte[] startKey = daughter.getStartKey();
+ byte[] endKey = daughter.getEndKey();
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, partitionId);
+ pstmt.setString(4, parentPartitionID);
+ pstmt.setLong(5, startTime);
+ // endTime in not set when inserting a new partition
+ pstmt.setNull(6, Types.BIGINT);
+ pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+ pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+ pstmt.executeUpdate();
+ }
+ }
conn.commit();
}
/**
* Update endTime in all rows of parent partition by setting it to
daughter's startTime.
+ * parent came from a split : there will only be one record
+ * parent came from a merge : there will be multiple rows, one per
grandparent
*
- * TODO: When we implement merges in this coproc, update all rows of the
parent.
*/
private void updateParentPartitionEndTime(Connection conn, String
tableName,
String streamName, List<String>
ancestorIDs,
- long daughterStartTime) throws
SQLException {
- // ancestorIDs = [parentID, grandparentID]
+ long daughterStartTime) throws
SQLException {
+ conn.setAutoCommit(false);
+ // ancestorIDs = [parentID, grandparentID1, grandparentID2...]
PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
- pstmt.setString(1, tableName);
- pstmt.setString(2, streamName);
- pstmt.setString(3, ancestorIDs.get(0));
- pstmt.setString(4, ancestorIDs.get(1));
- pstmt.setLong(5, daughterStartTime);
- pstmt.executeUpdate();
+ for (int i=1; i<ancestorIDs.size(); i++) {
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, ancestorIDs.get(0));
+ pstmt.setString(4, ancestorIDs.get(i));
+ pstmt.setLong(5, daughterStartTime);
+ pstmt.executeUpdate();
+ }
conn.commit();
}
+
+ /**
+ * Get the stream name on the given table if one exists in ENABLED state.
+ */
+ private String getStreamName(Connection conn, String tableName) throws
SQLException {
+ PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);
+ pstmt.setString(1, tableName);
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ return rs.getString(1);
+ } else {
+ return null;
+ }
+ }
+
+ private PTable getPhoenixTable(Connection conn, TableName tableName)
throws SQLException {
+ PTable pTable;
+ try {
+ pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+ } catch (TableNotFoundException e) {
+ return null;
+ }
+ return pTable;
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index d70821bb94..98dd22f535 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,13 +18,8 @@
package org.apache.phoenix.end2end;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
@@ -33,12 +28,12 @@ import
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,8 +44,12 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
@@ -172,7 +171,7 @@ public class CDCStreamIT extends CDCBaseIT {
createTableAndEnableCDC(conn, tableName);
//split the only region somewhere in the middle
- splitTable(conn, tableName, Bytes.toBytes("m"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
//check partition metadata - daughter regions are inserted and
parent's end time is updated.
ResultSet rs = conn.createStatement().executeQuery(
@@ -209,9 +208,9 @@ public class CDCStreamIT extends CDCBaseIT {
createTableAndEnableCDC(conn, tableName);
//split the only region [null, null]
- splitTable(conn, tableName, Bytes.toBytes("l"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
// we have 2 regions - [null, l], [l, null], split the first region
- splitTable(conn, tableName, Bytes.toBytes("d"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
ResultSet rs = conn.createStatement().executeQuery(
"SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
PartitionMetadata grandparent = null, splitParent = null,
unSplitParent = null;
@@ -253,9 +252,9 @@ public class CDCStreamIT extends CDCBaseIT {
createTableAndEnableCDC(conn, tableName);
//split the only region [null, null]
- splitTable(conn, tableName, Bytes.toBytes("l"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
// we have 2 regions - [null, l], [l, null], split the second region
- splitTable(conn, tableName, Bytes.toBytes("q"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
ResultSet rs = conn.createStatement().executeQuery(
"SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
PartitionMetadata grandparent = null, splitParent = null,
unSplitParent = null;
@@ -297,11 +296,11 @@ public class CDCStreamIT extends CDCBaseIT {
createTableAndEnableCDC(conn, tableName);
//split the only region [null, null]
- splitTable(conn, tableName, Bytes.toBytes("d"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
// we have 2 regions - [null, d], [d, null], split the second region
- splitTable(conn, tableName, Bytes.toBytes("q"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
// we have 3 regions - [null, d], [d, q], [q, null], split the second
region
- splitTable(conn, tableName, Bytes.toBytes("j"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("j"));
// [null, d], [d, j], [j, q], [q, null]
ResultSet rs = conn.createStatement().executeQuery(
"SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
@@ -324,6 +323,160 @@ public class CDCStreamIT extends CDCBaseIT {
assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'j' &&
d.endKey[0] == 'q'));
}
+ /**
+ * Test split of a region which came from a merge.
+ */
+ @Test
+ public void testPartitionMetadataMergedRegionSplits() throws Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ //split the only region
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
+
+ //merge the 2 regions
+ List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn,
tableName);
+ TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()));
+
+ //split again
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+
+ //verify partition metadata
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+ List<PartitionMetadata> mergedParent = new ArrayList<>();
+ List<PartitionMetadata> splitDaughters = new ArrayList<>();
+
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.startKey == null && pm.endKey == null &&
pm.parentPartitionId != null) {
+ mergedParent.add(pm);
+ }
+ if (pm.endTime == 0) {
+ splitDaughters.add(pm);
+ }
+ }
+ assertEquals(2, mergedParent.size());
+ assertEquals(2, splitDaughters.size());
+ assertEquals(mergedParent.get(0).partitionId,
mergedParent.get(1).partitionId);
+ assertEquals(mergedParent.get(0).partitionId,
splitDaughters.get(0).parentPartitionId);
+ assertEquals(mergedParent.get(0).partitionId,
splitDaughters.get(1).parentPartitionId);
+ assertEquals(splitDaughters.get(0).startTime,
splitDaughters.get(1).startTime);
+ assertEquals(splitDaughters.get(0).startTime,
mergedParent.get(0).endTime);
+ assertEquals(splitDaughters.get(0).startTime,
mergedParent.get(1).endTime);
+ }
+
+
+
+ /**
+ * Test merge of 2 regions which came from a split.
+ */
+ @Test
+ public void testPartitionMetadataSplitRegionsMerge() throws Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ //split the only region
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+
+ //merge the 2 regions
+ List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn,
tableName);
+ TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+
.map(HRegionLocation::getRegion)
+
.map(RegionInfo::getEncodedName)
+
.collect(Collectors.toList()));
+
+ //verify partition metadata
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+
+ List<PartitionMetadata> splitParents = new ArrayList<>();
+ List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.startKey == null && pm.endKey == null && pm.endTime == 0) {
+ mergedDaughter.add(pm);
+ }
+ if (pm.startKey != null || pm.endKey != null) {
+ splitParents.add(pm);
+ }
+ }
+ assertEquals(2, mergedDaughter.size());
+ assertEquals(2, splitParents.size());
+ assertEquals(mergedDaughter.get(0).startTime,
mergedDaughter.get(1).startTime);
+ assertEquals(mergedDaughter.get(0).endTime,
mergedDaughter.get(1).endTime);
+ assertEquals(mergedDaughter.get(0).partitionId,
mergedDaughter.get(1).partitionId);
+ assertTrue(mergedDaughter.stream().anyMatch(d ->
Objects.equals(d.parentPartitionId, splitParents.get(0).partitionId)));
+ assertTrue(mergedDaughter.stream().anyMatch(d ->
Objects.equals(d.parentPartitionId, splitParents.get(1).partitionId)));
+ for (PartitionMetadata splitDaughter : splitParents) {
+ Assert.assertEquals(mergedDaughter.get(0).startTime,
splitDaughter.endTime);
+ }
+ }
+
+ /**
+ * Test merge of 2 regions which came from different merges.
+ */
+ @Test
+ public void testPartitionMetadataMergedRegionsMerge() throws Exception {
+ // create table, cdc and bootstrap stream metadata
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ // split the only region
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+ // split both regions
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
+ TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
+ // merge first two and last two regions
+ List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn,
tableName);
+ TestUtil.mergeTableRegions(conn, tableName,
regions.subList(0,2).stream()
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()));
+ TestUtil.mergeTableRegions(conn, tableName,
regions.subList(2,4).stream()
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()));
+ // merge the two regions
+ regions = TestUtil.getAllTableRegions(conn, tableName);
+ TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getEncodedName)
+ .collect(Collectors.toList()));
+
+ //verify partition metadata
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" +
tableName + "'");
+
+ List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+ List<PartitionMetadata> mergedParents = new ArrayList<>();
+ while (rs.next()) {
+ PartitionMetadata pm = new PartitionMetadata(rs);
+ if (pm.endTime == 0) {
+ mergedDaughter.add(pm);
+ }
+ // this will add extra rows, we will prune later
+ else if (pm.startKey == null || pm.endKey == null) {
+ mergedParents.add(pm);
+ }
+ }
+ assertEquals(2, mergedDaughter.size());
+ assertEquals(9, mergedParents.size());
+ assertEquals(mergedDaughter.get(0).startTime,
mergedDaughter.get(1).startTime);
+ Collections.sort(mergedParents, Comparator.comparing(o -> o.endTime));
+ for (PartitionMetadata mergedParent :
mergedParents.subList(mergedParents.size()-4, mergedParents.size())) {
+ assertEquals(mergedDaughter.get(0).startTime,
mergedParent.endTime);
+ }
+ }
+
private String getStreamName(Connection conn, String tableName, String
cdcName) throws SQLException {
return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
CDCUtil.getCDCCreationTimestamp(
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
@@ -381,31 +534,6 @@ public class CDCStreamIT extends CDCBaseIT {
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('z', 8, 'cat')");
}
- /**
- * Split the table at the provided split point.
- */
- private void splitTable(Connection conn, String tableName, byte[]
splitPoint) throws Exception {
- ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- Admin admin = services.getAdmin();
- Configuration configuration =
-
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
- org.apache.hadoop.hbase.client.Connection hbaseConn =
- ConnectionFactory.createConnection(configuration);
- RegionLocator regionLocator =
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
- int nRegions = regionLocator.getAllRegionLocations().size();
- try {
- admin.split(TableName.valueOf(tableName), splitPoint);
- int retryCount = 0;
- do {
- Thread.sleep(2000);
- retryCount++;
- } while (retryCount < 10 &&
regionLocator.getAllRegionLocations().size() == nRegions);
-
Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), nRegions);
- } finally {
- admin.close();
- }
- }
-
/**
* Inner class to represent partition metadata for a region i.e. single
row from SYSTEM.CDC_STREAM
*/
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 6ea2a2eb65..969dc803cf 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -59,22 +59,26 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.CoprocessorDescriptor;
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -220,7 +224,7 @@ public class TestUtil {
public final static String ROW7 = "00B723122312312";
public final static String ROW8 = "00B823122312312";
public final static String ROW9 = "00C923122312312";
-
+
public final static String PARENTID1 = "0500x0000000001";
public final static String PARENTID2 = "0500x0000000002";
public final static String PARENTID3 = "0500x0000000003";
@@ -230,9 +234,9 @@ public class TestUtil {
public final static String PARENTID7 = "0500x0000000007";
public final static String PARENTID8 = "0500x0000000008";
public final static String PARENTID9 = "0500x0000000009";
-
+
public final static List<String> PARENTIDS = Lists.newArrayList(PARENTID1,
PARENTID2, PARENTID3, PARENTID4, PARENTID5, PARENTID6, PARENTID7, PARENTID8,
PARENTID9);
-
+
public final static String ENTITYHISTID1 = "017x00000000001";
public final static String ENTITYHISTID2 = "017x00000000002";
public final static String ENTITYHISTID3 = "017x00000000003";
@@ -244,7 +248,7 @@ public class TestUtil {
public final static String ENTITYHISTID9 = "017x00000000009";
public final static List<String> ENTITYHISTIDS =
Lists.newArrayList(ENTITYHISTID1, ENTITYHISTID2, ENTITYHISTID3, ENTITYHISTID4,
ENTITYHISTID5, ENTITYHISTID6, ENTITYHISTID7, ENTITYHISTID8, ENTITYHISTID9);
-
+
public static final String LOCALHOST = "localhost";
public static final String PHOENIX_JDBC_URL = JDBC_PROTOCOL +
JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_TERMINATOR +
PHOENIX_TEST_DRIVER_URL_PARAM;
public static final String PHOENIX_CONNECTIONLESS_JDBC_URL = JDBC_PROTOCOL
+ JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR +
PHOENIX_TEST_DRIVER_URL_PARAM;
@@ -897,7 +901,7 @@ public class TestUtil {
if (table.isTransactional()) {
mutationState.commit();
}
-
+
Admin hbaseAdmin = services.getAdmin();
hbaseAdmin.flush(TableName.valueOf(tableName));
hbaseAdmin.majorCompact(TableName.valueOf(tableName));
@@ -910,7 +914,7 @@ public class TestUtil {
scan.withStartRow(markerRowKey);
scan.withStopRow(Bytes.add(markerRowKey, new byte[]{0}));
scan.setRaw(true);
-
+
try (Table htableForRawScan =
services.getTable(Bytes.toBytes(tableName))) {
ResultScanner scanner = htableForRawScan.getScanner(scan);
List<Result> results = Lists.newArrayList(scanner);
@@ -1450,4 +1454,71 @@ public class TestUtil {
return
Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")),
null);
}
+ /**
+ * Split the table at the provided split point.
+ */
+ public static void splitTable(Connection conn, String tableName, byte[]
splitPoint)
+ throws Exception {
+ executeHBaseTableRegionOperation(conn, tableName, (admin,
regionLocator, nRegions) -> {
+ admin.split(TableName.valueOf(tableName), splitPoint);
+ waitForRegionChange(regionLocator, nRegions);
+ });
+ }
+
+ /**
+ * Merge the given regions of a table.
+ */
+ public static void mergeTableRegions(Connection conn, String tableName,
List<String> regions)
+ throws Exception {
+ byte[][] regionsToMerge = regions.stream()
+ .map(String::getBytes)
+ .toArray(byte[][]::new);
+
+ executeHBaseTableRegionOperation(conn, tableName, (admin,
regionLocator, nRegions) -> {
+ admin.mergeRegionsAsync(regionsToMerge, true);
+ waitForRegionChange(regionLocator, nRegions);
+ });
+ }
+
+ @FunctionalInterface
+ private interface TableOperation {
+ void execute(Admin admin, RegionLocator regionLocator, int
initialRegionCount)
+ throws Exception;
+ }
+
+ private static void executeHBaseTableRegionOperation(Connection conn,
String tableName,
+ TableOperation operation)
throws Exception {
+ ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Configuration configuration = services.getConfiguration();
+ org.apache.hadoop.hbase.client.Connection hbaseConn
+ = ConnectionFactory.createConnection(configuration);
+ Admin admin = services.getAdmin();
+ RegionLocator regionLocator =
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+ int nRegions = regionLocator.getAllRegionLocations().size();
+ operation.execute(admin, regionLocator, nRegions);
+
+ }
+
+ private static void waitForRegionChange(RegionLocator regionLocator, int
initialRegionCount)
+ throws Exception {
+ int retryCount = 0;
+ while (retryCount < 20
+ && regionLocator.getAllRegionLocations().size() ==
initialRegionCount) {
+ Thread.sleep(5000);
+ retryCount++;
+ }
+ Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(),
initialRegionCount);
+ }
+
+ public static List<HRegionLocation> getAllTableRegions(Connection conn,
String tableName)
+ throws Exception {
+ ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Configuration configuration = services.getConfiguration();
+ RegionLocator regionLocator;
+ org.apache.hadoop.hbase.client.Connection hbaseConn
+ = ConnectionFactory.createConnection(configuration);
+ regionLocator =
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+ return regionLocator.getAllRegionLocations();
+ }
+
}