This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 1bc2d0c245 PHOENIX-7478 HBase 3 compatibility changes: Replace ClusterConnection with Connection API (#2123) 1bc2d0c245 is described below commit 1bc2d0c2457de759ed2ee7713895c47d4a524c05 Author: Istvan Toth <st...@apache.org> AuthorDate: Mon Apr 28 07:42:49 2025 +0200 PHOENIX-7478 HBase 3 compatibility changes: Replace ClusterConnection with Connection API (#2123) Co-authored-by: Villő Szűcs <szucsvi...@gmail.com> --- .../phoenix/query/ConnectionQueryServicesImpl.java | 17 +++++--- .../org/apache/phoenix/schema/MetaDataClient.java | 45 +++++++++++++--------- .../coprocessor/MetaDataRegionObserver.java | 44 +++++++++++---------- .../wal/WALRecoveryRegionPostOpenIT.java | 3 +- .../apache/phoenix/end2end/AggregateQueryIT.java | 3 +- .../end2end/AggregateQueryWithRegionMovesIT.java | 3 +- 6 files changed, 64 insertions(+), 51 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 3031c2cd1c..762eccf054 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -156,7 +156,6 @@ import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.CheckAndMutate; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; @@ -798,7 +797,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public void clearTableRegionCache(TableName tableName) throws SQLException { - ((ClusterConnection)connection).clearRegionCache(tableName); + try { + connection.getRegionLocator(tableName).clearRegionLocationCache(); + } catch (IOException e) { + LOGGER.info("Exception while clearing table region cache", e); + //TODO allow passing cause to TableNotFoundException + throw new TableNotFoundException(tableName.toString()); + } } public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[] currentKey, @@ -902,8 +907,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement currentKey = startRowKey; do { HRegionLocation regionLocation = - ((ClusterConnection) connection).getRegionLocation(table, - currentKey, false); + connection.getRegionLocator(table).getRegionLocation(currentKey, false); currentKey = getNextRegionStartKey(regionLocation, currentKey, prevRegionLocation); locations.add(regionLocation); @@ -2225,8 +2229,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (true) { if (retried) { - ((ClusterConnection) connection).relocateRegion( - SchemaUtil.getPhysicalName(systemTableName, this.getProps()), tableKey); + connection.getRegionLocator(SchemaUtil.getPhysicalName( + systemTableName, this.getProps())) + .getRegionLocation(tableKey, true); } Table ht = this.getTable(SchemaUtil.getPhysicalName(systemTableName, this.getProps()).getName()); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index ac222db4a2..726e77ac4a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -186,7 +186,6 @@ import org.apache.phoenix.schema.task.SystemTaskParams; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; @@ -6562,7 +6561,7 @@ public class MetaDataClient { LOGGER.info(changePermsStatement.toString()); try(Admin admin = connection.getQueryServices().getAdmin()) { - ClusterConnection clusterConnection = (ClusterConnection) admin.getConnection(); + org.apache.hadoop.hbase.client.Connection hConnection = admin.getConnection(); if (changePermsStatement.getSchemaName() != null) { // SYSTEM.CATALOG doesn't have any entry for "default" HBase namespace, hence we will bypass the check @@ -6572,7 +6571,7 @@ public class MetaDataClient { connection); } - changePermsOnSchema(clusterConnection, changePermsStatement); + changePermsOnSchema(hConnection, changePermsStatement); } else if (changePermsStatement.getTableName() != null) { PTable inputTable = connection.getTable(SchemaUtil. normalizeFullTableName(changePermsStatement.getTableName().toString())); @@ -6582,11 +6581,11 @@ public class MetaDataClient { // Changing perms on base table and update the perms for global and view indexes // Views and local indexes are not physical tables and hence update perms is not needed - changePermsOnTables(clusterConnection, admin, changePermsStatement, inputTable); + changePermsOnTables(hConnection, admin, changePermsStatement, inputTable); } else { // User can be given perms at the global level - changePermsOnUser(clusterConnection, changePermsStatement); + changePermsOnUser(hConnection, changePermsStatement); } } catch (SQLException e) { @@ -6601,20 +6600,25 @@ public class MetaDataClient { return new MutationState(0, 0, connection); } - private void changePermsOnSchema(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement) throws Throwable { + private void changePermsOnSchema(org.apache.hadoop.hbase.client.Connection hConnection, + ChangePermsStatement changePermsStatement) throws Throwable { if (changePermsStatement.isGrantStatement()) { - AccessControlClient.grant(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), changePermsStatement.getPermsList()); + AccessControlClient.grant(hConnection, changePermsStatement.getSchemaName(), + changePermsStatement.getName(), changePermsStatement.getPermsList()); } else { - AccessControlClient.revoke(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), Permission.Action.values()); + AccessControlClient.revoke(hConnection, changePermsStatement.getSchemaName(), + changePermsStatement.getName(), Permission.Action.values()); } } - private void changePermsOnTables(ClusterConnection clusterConnection, Admin admin, ChangePermsStatement changePermsStatement, PTable inputTable) throws Throwable { + private void changePermsOnTables(org.apache.hadoop.hbase.client.Connection hConnection, + Admin admin, ChangePermsStatement changePermsStatement, + PTable inputTable) throws Throwable { org.apache.hadoop.hbase.TableName tableName = SchemaUtil.getPhysicalTableName (inputTable.getPhysicalName().getBytes(), inputTable.isNamespaceMapped()); - changePermsOnTable(clusterConnection, changePermsStatement, tableName); + changePermsOnTable(hConnection, changePermsStatement, tableName); boolean schemaInconsistency = false; List<PTable> inconsistentTables = null; @@ -6635,7 +6639,7 @@ public class MetaDataClient { LOGGER.info("Updating permissions for Index Table: " + indexTable.getName() + " Base Table: " + inputTable.getName()); tableName = SchemaUtil.getPhysicalTableName(indexTable.getPhysicalName().getBytes(), indexTable.isNamespaceMapped()); - changePermsOnTable(clusterConnection, changePermsStatement, tableName); + changePermsOnTable(hConnection, changePermsStatement, tableName); } if (schemaInconsistency) { @@ -6653,7 +6657,7 @@ public class MetaDataClient { if (viewIndexTableExists) { LOGGER.info("Updating permissions for View Index Table: " + Bytes.toString(viewIndexTableBytes) + " Base Table: " + inputTable.getName()); - changePermsOnTable(clusterConnection, changePermsStatement, tableName); + changePermsOnTable(hConnection, changePermsStatement, tableName); } else { if (inputTable.isMultiTenant()) { LOGGER.error("View Index Table not found for MultiTenant Table: " + inputTable.getName()); @@ -6664,23 +6668,28 @@ public class MetaDataClient { } } - private void changePermsOnTable(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement, org.apache.hadoop.hbase.TableName tableName) + private void changePermsOnTable(org.apache.hadoop.hbase.client.Connection hConnection, + ChangePermsStatement changePermsStatement, + org.apache.hadoop.hbase.TableName tableName) throws Throwable { if (changePermsStatement.isGrantStatement()) { - AccessControlClient.grant(clusterConnection, tableName, changePermsStatement.getName(), + AccessControlClient.grant(hConnection, tableName, changePermsStatement.getName(), null, null, changePermsStatement.getPermsList()); } else { - AccessControlClient.revoke(clusterConnection, tableName, changePermsStatement.getName(), + AccessControlClient.revoke(hConnection, tableName, changePermsStatement.getName(), null, null, Permission.Action.values()); } } - private void changePermsOnUser(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement) + private void changePermsOnUser(org.apache.hadoop.hbase.client.Connection hConnection, + ChangePermsStatement changePermsStatement) throws Throwable { if (changePermsStatement.isGrantStatement()) { - AccessControlClient.grant(clusterConnection, changePermsStatement.getName(), changePermsStatement.getPermsList()); + AccessControlClient.grant(hConnection, changePermsStatement.getName(), + changePermsStatement.getPermsList()); } else { - AccessControlClient.revoke(clusterConnection, changePermsStatement.getName(), Permission.Action.values()); + AccessControlClient.revoke(hConnection, changePermsStatement.getName(), + Permission.Action.values()); } } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 0562dfea81..a9ea23a3fe 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -42,10 +42,11 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -56,12 +57,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.ipc.RemoteException; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.compile.PostDDLCompiler; @@ -677,22 +675,26 @@ public class MetaDataRegionObserver implements RegionObserver,RegionCoprocessor } public static boolean tableRegionsOnline(Configuration conf, PTable table) { - try (ClusterConnection hcon = - (ClusterConnection) ConnectionFactory.createConnection(conf)) { - List<HRegionLocation> locations = hcon.locateRegions( - org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes())); - - for (HRegionLocation loc : locations) { + try (Connection hcon = ConnectionFactory.createConnection(conf)) { + Admin admin = hcon.getAdmin(); + List<RegionInfo> regionInfos = admin.getRegions(TableName.valueOf( + table.getPhysicalName().getBytes())); + // This makes Number of Regions RPC calls sequentially. + // For large tables this can be slow. + for (RegionInfo regionInfo : regionInfos) { try { - ServerName sn = loc.getServerName(); - if (sn == null) continue; - - AdminProtos.AdminService.BlockingInterface admin = hcon.getAdmin(sn); - HBaseRpcController controller = hcon.getRpcControllerFactory().newController(); - org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.getRegionInfo(controller, - admin, loc.getRegion().getRegionName()); - } catch (RemoteException e) { - LOGGER.debug("Cannot get region " + loc.getRegion().getEncodedName() + " info due to error:" + e); + // We don't actually care about the compaction state, we are only calling this + // because this will trigger a call to the RS (from master), and we want to make + // sure that all RSs are available + // There are only a few methods in HBase 3.0 that are directly calling the RS, + // this is one of them. + admin.getCompactionStateForRegion(regionInfo.getRegionName()); + // This used to make a direct RPC call to the region, but HBase 3 makes that + // very hard (needs reflection, or a bridge class in the same package), + // and it's not necessary for checking the RS liveness + } catch (IOException e) { + LOGGER.debug("Cannot get region " + regionInfo.getEncodedName() + + " info due to error:" + e); return false; } } diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java index 172dc85e0c..36f6c59fb0 100644 --- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java +++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -259,7 +258,7 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest { scan = new Scan(); primaryTable.close(); primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME)); - ((ClusterConnection)hbaseConn).clearRegionLocationCache(); + hbaseConn.clearRegionLocationCache(); resultScanner = primaryTable.getScanner(scan); count = 0; for (Result result : resultScanner) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java index 03df8b2946..6cf51d1194 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java @@ -38,7 +38,6 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.util.Bytes; @@ -108,8 +107,8 @@ public class AggregateQueryIT extends BaseQueryIT { admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); Configuration configuration = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(configuration); - ((ClusterConnection)hbaseConn).clearRegionCache(TableName.valueOf(tableName)); RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)); + regionLocator.clearRegionLocationCache(); int nRegions = regionLocator.getAllRegionLocations().size(); admin.split(tn, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3"))); int retryCount = 0; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryWithRegionMovesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryWithRegionMovesIT.java index 2382d82849..ee1aae8f87 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryWithRegionMovesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryWithRegionMovesIT.java @@ -20,7 +20,6 @@ package org.apache.phoenix.end2end; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.util.Bytes; @@ -135,8 +134,8 @@ public class AggregateQueryWithRegionMovesIT extends BaseQueryWithRegionMovesIT conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(configuration); - ((ClusterConnection) hbaseConn).clearRegionCache(TableName.valueOf(tableName)); RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)); + regionLocator.clearRegionLocationCache(); int nRegions = regionLocator.getAllRegionLocations().size(); admin.split(tn, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3"))); int retryCount = 0;