This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit fb0d712096b62198e6093a04bade5d289395d4e7 Author: slothever <[email protected]> AuthorDate: Thu Feb 1 19:06:45 2024 +0800 [fix](multi-catalog)access HMS need ugiDoAs (#30595) --- .../datasource/hive/ThriftHMSCachedClient.java | 81 ++++++++++++---------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index abb3fda24b3..1e5e2f1287a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.common.Config; import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -56,6 +57,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.security.PrivilegedExceptionAction; import java.util.BitSet; import java.util.Collections; import java.util.LinkedList; @@ -93,7 +95,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public List<String> getAllDatabases() { try (ThriftHMSClient client = getClient()) { try { - return client.client.getAllDatabases(); + return ugiDoAs(client.client::getAllDatabases); } catch (Exception e) { client.setThrowable(e); throw e; @@ -107,7 +109,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public List<String> getAllTables(String dbName) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getAllTables(dbName); + return ugiDoAs(() -> client.client.getAllTables(dbName)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -121,7 +123,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public boolean tableExists(String dbName, String tblName) { try (ThriftHMSClient client = getClient()) { try { - return client.client.tableExists(dbName, tblName); + return ugiDoAs(() -> client.client.tableExists(dbName, tblName)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -142,7 +144,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { short limited = maxListPartitionNum <= Short.MAX_VALUE ? (short) maxListPartitionNum : MAX_LIST_PARTITION_NUM; try (ThriftHMSClient client = getClient()) { try { - return client.client.listPartitionNames(dbName, tblName, limited); + return ugiDoAs(() -> client.client.listPartitionNames(dbName, tblName, limited)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -156,7 +158,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public Partition getPartition(String dbName, String tblName, List<String> partitionValues) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getPartition(dbName, tblName, partitionValues); + return ugiDoAs(() -> client.client.getPartition(dbName, tblName, partitionValues)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -171,7 +173,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getPartitionsByNames(dbName, tblName, partitionNames); + return ugiDoAs(() -> client.client.getPartitionsByNames(dbName, tblName, partitionNames)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -186,7 +188,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public Database getDatabase(String dbName) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getDatabase(dbName); + return ugiDoAs(() -> client.client.getDatabase(dbName)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -200,7 +202,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public Table getTable(String dbName, String tblName) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getTable(dbName, tblName); + return ugiDoAs(() -> client.client.getTable(dbName, tblName)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -214,7 +216,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public List<FieldSchema> getSchema(String dbName, String tblName) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getSchema(dbName, tblName); + return ugiDoAs(() -> client.client.getSchema(dbName, tblName)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -228,7 +230,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getTableColumnStatistics(dbName, tblName, columns); + return ugiDoAs(() -> client.client.getTableColumnStatistics(dbName, tblName, columns)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -243,7 +245,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { String dbName, String tblName, List<String> partNames, List<String> columns) { try (ThriftHMSClient client = getClient()) { try { - return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns); + return ugiDoAs(() -> client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -257,7 +259,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public CurrentNotificationEventId getCurrentNotificationEventId() { try (ThriftHMSClient client = getClient()) { try { - return client.client.getCurrentNotificationEventId(); + return ugiDoAs(client.client::getCurrentNotificationEventId); } catch (Exception e) { client.setThrowable(e); throw e; @@ -271,12 +273,12 @@ public class ThriftHMSCachedClient implements HMSCachedClient { @Override public NotificationEventResponse getNextNotification(long lastEventId, - int maxEvents, - IMetaStoreClient.NotificationFilter filter) + int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws MetastoreNotificationFetchException { try (ThriftHMSClient client = getClient()) { try { - return client.client.getNextNotification(lastEventId, maxEvents, filter); + return ugiDoAs(() -> client.client.getNextNotification(lastEventId, maxEvents, filter)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -293,7 +295,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public long openTxn(String user) { try (ThriftHMSClient client = getClient()) { try { - return client.client.openTxn(user); + return ugiDoAs(() -> client.client.openTxn(user)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -307,7 +309,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public void commitTxn(long txnId) { try (ThriftHMSClient client = getClient()) { try { - client.client.commitTxn(txnId); + ugiDoAs(() -> { + client.client.commitTxn(txnId); + return null; + }); } catch (Exception e) { client.setThrowable(e); throw e; @@ -319,7 +324,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { @Override public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, - List<String> partitionNames, long timeoutMs) { + List<String> partitionNames, long timeoutMs) { LockRequestBuilder request = new LockRequestBuilder(queryId).setTransactionId(txnId).setUser(user); List<LockComponent> lockComponents = createLockComponentsForRead(tblName, partitionNames); for (LockComponent component : lockComponents) { @@ -328,7 +333,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { try (ThriftHMSClient client = getClient()) { LockResponse response; try { - response = client.client.lock(request.build()); + response = ugiDoAs(() -> client.client.lock(request.build())); } catch (Exception e) { client.setThrowable(e); throw e; @@ -356,20 +361,22 @@ public class ThriftHMSCachedClient implements HMSCachedClient { public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { try (ThriftHMSClient client = getClient()) { try { - // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive - // Do not pass currentTransactionId instead as - // it will break Hive's listing of delta directories if major compaction - // deletes delta directories for valid transactions that existed at the time transaction is opened - ValidTxnList validTransactions = client.client.getValidTxns(); - List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds( - Collections.singletonList(fullTableName), validTransactions.toString()); - if (tableValidWriteIdsList.size() != 1) { - throw new Exception("tableValidWriteIdsList's size should be 1"); - } - ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId, - tableValidWriteIdsList); - ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName); - return writeIdList; + return ugiDoAs(() -> { + // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive + // Do not pass currentTransactionId instead as + // it will break Hive's listing of delta directories if major compaction + // deletes delta directories for valid transactions that existed at the time transaction is opened + ValidTxnList validTransactions = client.client.getValidTxns(); + List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds( + Collections.singletonList(fullTableName), validTransactions.toString()); + if (tableValidWriteIdsList.size() != 1) { + throw new Exception("tableValidWriteIdsList's size should be 1"); + } + ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId, + tableValidWriteIdsList); + ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName); + return writeIdList; + }); } catch (Exception e) { client.setThrowable(e); throw e; @@ -385,7 +392,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { private LockResponse checkLock(long lockId) { try (ThriftHMSClient client = getClient()) { try { - return client.client.checkLock(lockId); + return ugiDoAs(() -> client.client.checkLock(lockId)); } catch (Exception e) { client.setThrowable(e); throw e; @@ -460,7 +467,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { synchronized (clientPool) { ThriftHMSClient client = clientPool.poll(); if (client == null) { - return new ThriftHMSClient(hiveConf); + return ugiDoAs(() -> new ThriftHMSClient(hiveConf)); } return client; } @@ -468,5 +475,9 @@ public class ThriftHMSCachedClient implements HMSCachedClient { Thread.currentThread().setContextClassLoader(classLoader); } } + + private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) { + return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
