This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1d89dd76074 [fix](catalog) close connection on refresh (#35426)
(#35743)
1d89dd76074 is described below
commit 1d89dd760741bb46af2366a8124a53acf784cf42
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri May 31 22:51:13 2024 +0800
[fix](catalog) close connection on refresh (#35426) (#35743)
bp #35426
---
.../apache/doris/datasource/InternalCatalog.java | 29 ---------
.../doris/datasource/hive/HMSCachedClient.java | 5 ++
.../doris/datasource/hive/HMSExternalCatalog.java | 8 +++
.../datasource/hive/HiveMetaStoreClientHelper.java | 71 ----------------------
.../doris/datasource/hive/HiveMetadataOps.java | 9 ++-
.../hive/PostgreSQLJdbcHMSCachedClient.java | 5 ++
.../datasource/hive/ThriftHMSCachedClient.java | 18 +++++-
.../datasource/iceberg/IcebergMetadataOps.java | 4 ++
.../datasource/operations/ExternalMetadataOps.java | 5 ++
.../doris/datasource/TestHMSCachedClient.java | 4 ++
10 files changed, 55 insertions(+), 103 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 3bb5cb0bcb8..5c496f0aefe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -71,7 +71,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.HashDistributionInfo;
-import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.JdbcTable;
@@ -136,9 +135,6 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.es.EsRepository;
-import org.apache.doris.datasource.hive.HMSCachedClient;
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.event.DropPartitionEvent;
import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@@ -181,7 +177,6 @@ import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@@ -2886,30 +2881,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
return checkCreateTableResult(tableName, tableId, result);
}
- private void createHiveTable(Database db, CreateTableStmt stmt) throws
DdlException {
- String tableName = stmt.getTableName();
- List<Column> columns = stmt.getColumns();
- long tableId = Env.getCurrentEnv().getNextId();
- HiveTable hiveTable = new HiveTable(tableId, tableName, columns,
stmt.getProperties());
- hiveTable.setComment(stmt.getComment());
- // check hive table whether exists in hive database
- HiveConf hiveConf = new HiveConf();
- hiveConf.set(HMSProperties.HIVE_METASTORE_URIS,
-
hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
- if
(!Strings.isNullOrEmpty(hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION)))
{
- hiveConf.set(HMSProperties.HIVE_VERSION,
hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION));
- }
- HMSCachedClient client = HiveMetadataOps.createCachedClient(hiveConf,
1, null);
- if (!client.tableExists(hiveTable.getHiveDb(),
hiveTable.getHiveTable())) {
- throw new DdlException(String.format("Table [%s] dose not exist in
Hive.", hiveTable.getHiveDbTable()));
- }
- // check hive table if exists in doris database
- if (!db.createTableWithLock(hiveTable, false,
stmt.isSetIfNotExists()).first) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR,
tableName);
- }
- LOG.info("successfully create table[{}-{}]", tableName, tableId);
- }
-
private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws
DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
index b10bfc39d44..c9d0ce1736b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
@@ -110,4 +110,9 @@ public interface HMSCachedClient {
void addPartitions(String dbName, String tableName,
List<HivePartitionWithStatistics> partitions);
void dropPartition(String dbName, String tableName, List<String>
partitionValues, boolean deleteData);
+
+ /**
+ * close the connection, eg, to hms
+ */
+ void close();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 9b4540f3b22..243dfb3c24f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -162,6 +162,14 @@ public class HMSExternalCatalog extends ExternalCatalog {
metadataOps = hiveOps;
}
+ @Override
+ public void onRefresh(boolean invalidCache) {
+ super.onRefresh(invalidCache);
+ if (metadataOps != null) {
+ metadataOps.close();
+ }
+ }
+
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 3ebce966777..e7dfda25c3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -32,36 +32,25 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.thrift.TExprOpcode;
-import com.aliyun.datalake.metastore.common.DataLakeConfig;
-import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -76,7 +65,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import shade.doris.hive.org.apache.thrift.TException;
import java.security.PrivilegedExceptionAction;
import java.time.LocalDateTime;
@@ -148,65 +136,6 @@ public class HiveMetaStoreClientHelper {
}
}
- private static IMetaStoreClient getClient(String metaStoreUris) throws
DdlException {
- HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUris);
- hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
- String.valueOf(Config.hive_metastore_client_timeout_second));
- IMetaStoreClient metaStoreClient = null;
- String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
- try {
- if ("dlf".equalsIgnoreCase(type)) {
- // For aliyun DLF
- hiveConf.set(DataLakeConfig.CATALOG_CREATE_DEFAULT_DB,
"false");
- metaStoreClient = new ProxyMetaStoreClient(hiveConf);
- } else {
- metaStoreClient = new HiveMetaStoreClient(hiveConf);
- }
- } catch (MetaException e) {
- LOG.warn("Create HiveMetaStoreClient failed: {}", e.getMessage());
- throw new DdlException("Create HiveMetaStoreClient failed: " +
e.getMessage());
- }
- return metaStoreClient;
- }
-
- public static Table getTable(HiveTable hiveTable) throws DdlException {
- IMetaStoreClient client =
getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
- Table table;
- try {
- table = client.getTable(hiveTable.getHiveDb(),
hiveTable.getHiveTable());
- } catch (TException e) {
- LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
- throw new DdlException("Connect hive metastore failed. Error: " +
e.getMessage());
- }
- return table;
- }
-
- /**
- * Get hive table with dbName and tableName.
- * Only for Hudi.
- *
- * @param dbName database name
- * @param tableName table name
- * @param metaStoreUris hive metastore uris
- * @return HiveTable
- * @throws DdlException when get table from hive metastore failed.
- */
- @Deprecated
- public static Table getTable(String dbName, String tableName, String
metaStoreUris) throws DdlException {
- IMetaStoreClient client = getClient(metaStoreUris);
- Table table;
- try {
- table = client.getTable(dbName, tableName);
- } catch (TException e) {
- LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
- throw new DdlException("Connect hive metastore failed. Error: " +
e.getMessage());
- } finally {
- client.close();
- }
- return table;
- }
-
/**
* Convert Doris expr to Hive expr, only for partition column
* @param tblName
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 72f19329046..70c61875b8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -81,8 +81,8 @@ public class HiveMetadataOps implements ExternalMetadataOps {
return catalog;
}
- public static HMSCachedClient createCachedClient(HiveConf hiveConf, int
thriftClientPoolSize,
- JdbcClientConfig
jdbcClientConfig) {
+ private static HMSCachedClient createCachedClient(HiveConf hiveConf, int
thriftClientPoolSize,
+ JdbcClientConfig jdbcClientConfig) {
if (hiveConf != null) {
return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
}
@@ -266,6 +266,11 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
return listDatabaseNames().contains(dbName);
}
+ @Override
+ public void close() {
+ client.close();
+ }
+
public List<String> listDatabaseNames() {
return client.getAllDatabases();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
index 932118001e5..8e41b48bfdd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
@@ -63,6 +63,11 @@ public class PostgreSQLJdbcHMSCachedClient extends
JdbcHMSCachedClient {
super(jdbcClientConfig);
}
+ @Override
+ public void close() {
+ // the jdbc connection is used on demand, so we do not need to close
it.
+ }
+
@Override
public Database getDatabase(String dbName) {
throw new HMSClientException("Do not support in
PostgreSQLJdbcHMSCachedClient.");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 0f74da32018..0acf8893267 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -89,6 +89,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient
{
private static final short MAX_LIST_PARTITION_NUM =
Config.max_hive_list_partition_num;
private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
+ private boolean isClosed = false;
private final int poolSize;
private final HiveConf hiveConf;
@@ -100,6 +101,21 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
}
this.hiveConf = hiveConf;
this.poolSize = poolSize;
+ this.isClosed = false;
+ }
+
+ @Override
+ public void close() {
+ synchronized (clientPool) {
+ this.isClosed = true;
+ while (!clientPool.isEmpty()) {
+ try {
+ clientPool.poll().close();
+ } catch (Exception e) {
+ LOG.warn("failed to close thrift client", e);
+ }
+ }
+ }
}
@Override
@@ -604,7 +620,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
@Override
public void close() throws Exception {
synchronized (clientPool) {
- if (throwable != null || clientPool.size() > poolSize) {
+ if (isClosed || throwable != null || clientPool.size() >
poolSize) {
client.close();
} else {
clientPool.offer(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index c59db7b4b79..7161f48680a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -64,6 +64,10 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
return catalog;
}
+ @Override
+ public void close() {
+ }
+
@Override
public boolean tableExist(String dbName, String tblName) {
return catalog.tableExists(TableIdentifier.of(dbName, tblName));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index b603b7a3ca7..9426442cebb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -82,4 +82,9 @@ public interface ExternalMetadataOps {
boolean tableExist(String dbName, String tblName);
boolean databaseExist(String dbName);
+
+ /**
+ * close the connection, eg, to hms
+ */
+ void close();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
index dd2e8dc2d11..6f969257245 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
@@ -52,6 +52,10 @@ public class TestHMSCachedClient implements HMSCachedClient {
public Map<String, List<Table>> tables = new HashMap<>();
public List<Database> dbs = new ArrayList<>();
+ @Override
+ public void close() {
+ }
+
@Override
public Database getDatabase(String dbName) {
for (Database db : this.dbs) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]