This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3eda77b3d9f [branch-2.1][improvement](jdbc catalog) Optimize
JdbcCatalog case mapping stability (#41330)
3eda77b3d9f is described below
commit 3eda77b3d9f1a6eba6977b581122e05142590a8f
Author: zy-kkk <[email protected]>
AuthorDate: Thu Sep 26 22:49:55 2024 +0800
[branch-2.1][improvement](jdbc catalog) Optimize JdbcCatalog case mapping
stability (#41330)
pick #40891
This PR makes the following changes to the uppercase and lowercase
mapping of JdbcCatalog
1. The identifierMapping is managed by JdbcExternalCatalog instead of
JdbcClient to better control its lifecycle
2. The identifierMapping no longer loads remoteName alone, but Catalog
controls the loading uniformly
3. The identifierMapping will be loaded when each FE performs
makeSureInitialized() to ensure that each FE has a mapping
4. The initialization of mapping will only be performed once in
makeSureInitialized(), which means that even if you use metaCache, if
your source data is updated when identifierMapping is enabled, you must
refresh the catalog to query normally.
5. The identifierMapping is only responsible for the properties of the
Catalog and is no longer affected by the fe config, simplifying the
processing logic
6. If lower_case_mete_names is false and meta_names_mapping is empty in
the catalog properties, the identifierMapping will no longer take
effect, further enhancing the stability of the default settings
7. The JdbcClient is no longer closed during onRefreshCache, reducing
the repeated creation of resources, improving reuse, and reducing the
leakage of some global shared threads
---
.../apache/doris/datasource/ExternalCatalog.java | 18 ++
.../apache/doris/datasource/ExternalDatabase.java | 7 +
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 56 +++-
.../doris/datasource/jdbc/JdbcExternalTable.java | 25 +-
.../datasource/jdbc/JdbcIdentifierMapping.java | 45 ---
.../doris/datasource/jdbc/client/JdbcClient.java | 46 +--
.../datasource/jdbc/client/JdbcMySQLClient.java | 4 +-
.../datasource/jdbc/client/JdbcOracleClient.java | 4 +-
...rMapping.java => DefaultIdentifierMapping.java} | 202 +++++---------
.../datasource/mapping/IdentifierMapping.java | 307 +--------------------
10 files changed, 173 insertions(+), 541 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index f6e5a570cc9..1b3cd9c33a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -45,6 +45,7 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.mapping.IdentifierMapping;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
@@ -142,6 +143,9 @@ public abstract class ExternalCatalog
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
+ protected IdentifierMapping identifierMapping;
+ private boolean mappingsInitialized = false;
+
public ExternalCatalog() {
}
@@ -174,6 +178,10 @@ public abstract class ExternalCatalog
}
}
+ // only for forward to master
+ protected void buildDatabaseMapping() {
+ }
+
// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
@@ -202,6 +210,10 @@ public abstract class ExternalCatalog
*/
public abstract List<String> listTableNames(SessionContext ctx, String
dbName);
+ // only for forward to master
+ protected void buildTableMapping(SessionContext ctx, String dbName) {
+ }
+
/**
* check if the specified table exist.
*
@@ -266,6 +278,10 @@ public abstract class ExternalCatalog
}
initialized = true;
}
+ if (!mappingsInitialized) {
+ buildDatabaseMapping();
+ mappingsInitialized = true;
+ }
}
protected final void initLocalObjects() {
@@ -391,6 +407,7 @@ public abstract class ExternalCatalog
public void onRefresh(boolean invalidCache) {
this.objectCreated = false;
this.initialized = false;
+ this.mappingsInitialized = false;
synchronized (this.propLock) {
this.convertedProperties = null;
}
@@ -716,6 +733,7 @@ public abstract class ExternalCatalog
}
this.propLock = new byte[0];
this.initialized = false;
+ this.mappingsInitialized = false;
setDefaultPropsIfMissing(true);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
index d653a5a178e..cf65a5f0a48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
@@ -91,6 +91,8 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
private MetaCache<T> metaCache;
+ private boolean mappingsInitialized = false;
+
/**
* Create external database.
*
@@ -117,6 +119,7 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
public void setUnInitialized(boolean invalidCache) {
this.initialized = false;
+ this.mappingsInitialized = false;
this.invalidCacheInInit = invalidCache;
if (extCatalog.getUseMetaCache().isPresent()) {
if (extCatalog.getUseMetaCache().get() && metaCache != null) {
@@ -170,6 +173,10 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
}
initialized = true;
}
+ if (!mappingsInitialized) {
+ extCatalog.buildTableMapping(null, name);
+ mappingsInitialized = true;
+ }
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index 80cc0f554f6..d25f97e70bd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -31,6 +31,7 @@ import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
+import org.apache.doris.datasource.mapping.DefaultIdentifierMapping;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
@@ -118,19 +119,16 @@ public class JdbcExternalCatalog extends ExternalCatalog {
super.onRefresh(invalidCache);
if (jdbcClient != null) {
jdbcClient.closeClient();
+ jdbcClient = null;
}
}
- @Override
- public void onRefreshCache(boolean invalidCache) {
- onRefresh(invalidCache);
- }
-
@Override
public void onClose() {
super.onClose();
if (jdbcClient != null) {
jdbcClient.closeClient();
+ jdbcClient = null;
}
}
@@ -231,8 +229,6 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
- .setIsLowerCaseMetaNames(getLowerCaseMetaNames())
- .setMetaNamesMapping(getMetaNamesMapping())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap())
.setConnectionPoolMinSize(getConnectionPoolMinSize())
@@ -242,22 +238,62 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
+ identifierMapping = new
DefaultIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()),
+ getMetaNamesMapping());
}
+ @Override
protected List<String> listDatabaseNames() {
- return jdbcClient.getDatabaseNameList();
+ return
identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
+ }
+
+ @Override
+ protected void buildDatabaseMapping() {
+
identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
+ }
+
+ protected String getRemoteDatabaseName(String dbName) {
+ return identifierMapping.toRemoteDatabaseName(dbName);
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
- return jdbcClient.getTablesNameList(dbName);
+ String remoteDbName = getRemoteDatabaseName(dbName);
+ return identifierMapping.fromRemoteTableName(remoteDbName,
jdbcClient.getTablesNameList(remoteDbName));
+ }
+
+ @Override
+ protected void buildTableMapping(SessionContext ctx, String dbName) {
+ String remoteDbName = getRemoteDatabaseName(dbName);
+ identifierMapping.fromRemoteTableName(getRemoteDatabaseName(dbName),
+ jdbcClient.getTablesNameList(remoteDbName));
+ }
+
+ protected String getRemoteTableName(String dbName, String tblName) {
+ return
identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName);
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
- return jdbcClient.isTableExist(dbName, tblName);
+ String remoteDbName = getRemoteDatabaseName(dbName);
+ String remoteTblName = getRemoteTableName(dbName, tblName);
+ return jdbcClient.isTableExist(remoteDbName, remoteTblName);
+ }
+
+ public List<Column> listColumns(String dbName, String tblName) {
+ makeSureInitialized();
+ String remoteDbName = getRemoteDatabaseName(dbName);
+ String remoteTblName = getRemoteTableName(dbName, tblName);
+ return identifierMapping.fromRemoteColumnName(remoteDbName,
remoteTblName,
+ jdbcClient.getColumnsFromJdbc(remoteDbName,
+ remoteTblName));
+ }
+
+ protected Map<String, String> getRemoteColumnNames(String dbName, String
tblName) {
+ return
identifierMapping.toRemoteColumnNames(getRemoteDatabaseName(dbName),
+ getRemoteTableName(dbName, tblName));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index 07ce183a589..a3af7f5b820 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -32,6 +32,7 @@ import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;
+import com.google.common.collect.Maps;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -86,21 +87,29 @@ public class JdbcExternalTable extends ExternalTable {
@Override
public Optional<SchemaCacheValue> initSchema() {
- return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog)
catalog).getJdbcClient()
- .getColumnsFromJdbc(dbName, name)));
+ return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog)
catalog).listColumns(dbName, name)));
}
private JdbcTable toJdbcTable() {
List<Column> schema = getFullSchema();
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
- String fullDbName = this.dbName + "." + this.name;
- JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema,
TableType.JDBC_EXTERNAL_TABLE);
- jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
+ String fullTableName = this.dbName + "." + this.name;
+ JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema,
TableType.JDBC_EXTERNAL_TABLE);
+ jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName);
// Set remote properties
-
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
-
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName,
this.name));
-
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName,
this.name));
+
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName));
+
jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName,
this.name));
+ Map<String, String> remoteColumnNames =
jdbcCatalog.getRemoteColumnNames(this.dbName, this.name);
+ if (!remoteColumnNames.isEmpty()) {
+ jdbcTable.setRemoteColumnNames(remoteColumnNames);
+ } else {
+ remoteColumnNames = Maps.newHashMap();
+ for (Column column : schema) {
+ remoteColumnNames.put(column.getName(), column.getName());
+ }
+ jdbcTable.setRemoteColumnNames(remoteColumnNames);
+ }
return jdbcTable;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
deleted file mode 100644
index 20a74724b3e..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.jdbc;
-
-import org.apache.doris.datasource.jdbc.client.JdbcClient;
-import org.apache.doris.datasource.mapping.IdentifierMapping;
-
-public class JdbcIdentifierMapping extends IdentifierMapping {
- private final JdbcClient jdbcClient;
-
- public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String
metaNamesMapping, JdbcClient jdbcClient) {
- super(isLowerCaseMetaNames, metaNamesMapping);
- this.jdbcClient = jdbcClient;
- }
-
- @Override
- protected void loadDatabaseNames() {
- jdbcClient.getDatabaseNameList();
- }
-
- @Override
- protected void loadTableNames(String localDbName) {
- jdbcClient.getTablesNameList(localDbName);
- }
-
- @Override
- protected void loadColumnNames(String localDbName, String localTableName) {
- jdbcClient.getColumnsFromJdbc(localDbName, localTableName);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 0e57f989df3..2b82c074809 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;
import com.google.common.collect.ImmutableSet;
@@ -62,11 +61,8 @@ public abstract class JdbcClient {
protected ClassLoader classLoader = null;
protected HikariDataSource dataSource = null;
protected boolean isOnlySpecifiedDatabase;
- protected boolean isLowerCaseMetaNames;
- protected String metaNamesMapping;
protected Map<String, Boolean> includeDatabaseMap;
protected Map<String, Boolean> excludeDatabaseMap;
- protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching;
public static JdbcClient createJdbcClient(JdbcClientConfig
jdbcClientConfig) {
String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
@@ -101,8 +97,6 @@ public abstract class JdbcClient {
this.catalogName = jdbcClientConfig.getCatalog();
this.jdbcUser = jdbcClientConfig.getUser();
this.isOnlySpecifiedDatabase =
Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
- this.isLowerCaseMetaNames =
Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames());
- this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping();
this.includeDatabaseMap =
Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap());
this.excludeDatabaseMap =
@@ -111,7 +105,6 @@ public abstract class JdbcClient {
this.dbType = parseDbType(jdbcUrl);
initializeClassLoader(jdbcClientConfig);
initializeDataSource(jdbcClientConfig);
- this.jdbcLowerCaseMetaMatching = new
JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this);
}
// Initialize DataSource
@@ -294,10 +287,9 @@ public abstract class JdbcClient {
/**
* get all tables of one database
*/
- public List<String> getTablesNameList(String localDbName) {
+ public List<String> getTablesNameList(String remoteDbName) {
List<String> remoteTablesNames = Lists.newArrayList();
String[] tableTypes = getTableTypes();
- String remoteDbName = getRemoteDatabaseName(localDbName);
processTable(remoteDbName, null, tableTypes, (rs) -> {
try {
while (rs.next()) {
@@ -307,14 +299,12 @@ public abstract class JdbcClient {
throw new JdbcClientException("failed to get all tables for
remote database: `%s`", e, remoteDbName);
}
});
- return filterTableNames(remoteDbName, remoteTablesNames);
+ return remoteTablesNames;
}
- public boolean isTableExist(String localDbName, String localTableName) {
+ public boolean isTableExist(String remoteDbName, String remoteTableName) {
final boolean[] isExist = {false};
String[] tableTypes = getTableTypes();
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> {
try {
if (rs.next()) {
@@ -331,12 +321,10 @@ public abstract class JdbcClient {
/**
* get all columns of one table
*/
- public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String
localTableName) {
+ public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName,
String remoteTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
@@ -362,21 +350,7 @@ public abstract class JdbcClient {
field.isAllowNull(), field.getRemarks(),
true, -1));
}
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
- return filterColumnName(remoteDbName, remoteTableName,
dorisTableSchema);
- }
-
- public String getRemoteDatabaseName(String localDbname) {
- return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname);
- }
-
- public String getRemoteTableName(String localDbName, String
localTableName) {
- return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName,
localTableName);
- }
-
- public Map<String, String> getRemoteColumnNames(String localDbName, String
localTableName) {
- return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName,
localTableName);
+ return dorisTableSchema;
}
// protected methods,for subclass to override
@@ -434,7 +408,7 @@ public abstract class JdbcClient {
}
filteredDatabaseNames.add(databaseName);
}
- return
jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames);
+ return filteredDatabaseNames;
}
protected Set<String> getFilterInternalDatabases() {
@@ -445,14 +419,6 @@ public abstract class JdbcClient {
.build();
}
- protected List<String> filterTableNames(String remoteDbName, List<String>
remoteTableNames) {
- return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName,
remoteTableNames);
- }
-
- protected List<Column> filterColumnName(String remoteDbName, String
remoteTableName, List<Column> remoteColumns) {
- return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName,
remoteTableName, remoteColumns);
- }
-
protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema);
protected Type createDecimalOrStringType(int precision, int scale) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
index 5624392de14..3baa2ce9d91 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
@@ -129,12 +129,10 @@ public class JdbcMySQLClient extends JdbcClient {
* get all columns of one table
*/
@Override
- public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String
localTableName) {
+ public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName,
String remoteTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
index d37b36cbf3d..dc367e8ea6e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
@@ -49,12 +49,10 @@ public class JdbcOracleClient extends JdbcClient {
}
@Override
- public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String
localTableName) {
+ public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName,
String remoteTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java
similarity index 54%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java
index 363ef351152..4847cd86e6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java
@@ -18,7 +18,6 @@
package org.apache.doris.datasource.mapping;
import org.apache.doris.catalog.Column;
-import org.apache.doris.qe.GlobalVariable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -34,10 +33,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-public abstract class IdentifierMapping {
- private static final Logger LOG =
LogManager.getLogger(IdentifierMapping.class);
+public class DefaultIdentifierMapping implements IdentifierMapping {
+ private static final Logger LOG =
LogManager.getLogger(DefaultIdentifierMapping.class);
private final ObjectMapper mapper = new ObjectMapper();
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new
ConcurrentHashMap<>();
@@ -46,20 +44,24 @@ public abstract class IdentifierMapping {
private final ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<String, String>>>
localColumnToRemoteColumn = new ConcurrentHashMap<>();
- private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
- private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap
= new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, ConcurrentHashMap<String,
AtomicBoolean>> columnNamesLoadedMap
- = new ConcurrentHashMap<>();
-
private final boolean isLowerCaseMetaNames;
private final String metaNamesMapping;
- public IdentifierMapping(boolean isLowerCaseMetaNames, String
metaNamesMapping) {
+ public DefaultIdentifierMapping(boolean isLowerCaseMetaNames, String
metaNamesMapping) {
this.isLowerCaseMetaNames = isLowerCaseMetaNames;
this.metaNamesMapping = metaNamesMapping;
}
- public List<String> setDatabaseNameMapping(List<String>
remoteDatabaseNames) {
+ private boolean isMappingInvalid() {
+ return metaNamesMapping == null || metaNamesMapping.isEmpty();
+ }
+
+ @Override
+ public List<String> fromRemoteDatabaseName(List<String>
remoteDatabaseNames) {
+ // If mapping is not required, return the original input
+ if (!isLowerCaseMetaNames && isMappingInvalid()) {
+ return remoteDatabaseNames;
+ }
JsonNode databasesNode = readAndParseJson(metaNamesMapping,
"databases");
Map<String, String> databaseNameMapping = Maps.newTreeMap();
@@ -84,7 +86,12 @@ public abstract class IdentifierMapping {
return localDatabaseNames;
}
- public List<String> setTableNameMapping(String remoteDbName, List<String>
remoteTableNames) {
+ @Override
+ public List<String> fromRemoteTableName(String remoteDbName, List<String>
remoteTableNames) {
+ // If mapping is not required, return the original input
+ if (!isLowerCaseMetaNames && isMappingInvalid()) {
+ return remoteTableNames;
+ }
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
Map<String, String> tableNameMapping = Maps.newTreeMap();
@@ -101,40 +108,29 @@ public abstract class IdentifierMapping {
localTableToRemoteTable.putIfAbsent(remoteDbName, new
ConcurrentHashMap<>());
- List<String> localTableNames;
- List<String> conflictNames;
-
- if (GlobalVariable.lowerCaseTableNames == 1) {
- Map<String, List<String>> result =
nameListToMapping(remoteTableNames,
- localTableToRemoteTable.get(remoteDbName),
- tableNameMapping, true);
- localTableNames = result.get("localNames");
- conflictNames = result.get("conflictNames");
- if (!conflictNames.isEmpty()) {
- throw new RuntimeException(
- "Conflict table names found in remote database/schema:
" + remoteDbName
- + " when lower_case_table_names is 1: " +
conflictNames
- + ". Please use meta_name_mapping to specify
the names.");
- }
- } else {
- Map<String, List<String>> result =
nameListToMapping(remoteTableNames,
- localTableToRemoteTable.get(remoteDbName),
- tableNameMapping, isLowerCaseMetaNames);
- localTableNames = result.get("localNames");
- conflictNames = result.get("conflictNames");
-
- if (!conflictNames.isEmpty()) {
- throw new RuntimeException(
- "Conflict table names found in remote database/schema:
" + remoteDbName
- + "when lower_case_meta_names is true: " +
conflictNames
- + ". Please set lower_case_meta_names to false
or"
- + " use meta_name_mapping to specify the table
names.");
- }
+ Map<String, List<String>> result = nameListToMapping(remoteTableNames,
+ localTableToRemoteTable.get(remoteDbName),
+ tableNameMapping, isLowerCaseMetaNames);
+ List<String> localTableNames = result.get("localNames");
+ List<String> conflictNames = result.get("conflictNames");
+
+ if (!conflictNames.isEmpty()) {
+ throw new RuntimeException(
+ "Conflict table names found in remote database/schema: " +
remoteDbName
+ + " when lower_case_meta_names is true: " +
conflictNames
+ + ". Please set lower_case_meta_names to false or"
+ + " use meta_name_mapping to specify the table
names.");
}
return localTableNames;
}
- public List<Column> setColumnNameMapping(String remoteDbName, String
remoteTableName, List<Column> remoteColumns) {
+ @Override
+ public List<Column> fromRemoteColumnName(String remoteDatabaseName, String
remoteTableName,
+ List<Column> remoteColumns) {
+ // If mapping is not required, return the original input
+ if (!isLowerCaseMetaNames && isMappingInvalid()) {
+ return remoteColumns;
+ }
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
Map<String, String> columnNameMapping = Maps.newTreeMap();
@@ -142,123 +138,78 @@ public abstract class IdentifierMapping {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String remoteTable = node.path("remoteTable").asText();
- if (remoteDbName.equals(remoteDatabase) &&
remoteTable.equals(remoteTableName)) {
+ if (remoteDatabaseName.equals(remoteDatabase) &&
remoteTable.equals(remoteTableName)) {
String remoteColumn = node.path("remoteColumn").asText();
String mapping = node.path("mapping").asText();
columnNameMapping.put(remoteColumn, mapping);
}
}
}
- localColumnToRemoteColumn.putIfAbsent(remoteDbName, new
ConcurrentHashMap<>());
-
localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new
ConcurrentHashMap<>());
+ localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new
ConcurrentHashMap<>());
+
localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName,
new ConcurrentHashMap<>());
- List<String> localColumnNames;
- List<String> conflictNames;
-
- // Get the name from localColumns and save it to List<String>
List<String> remoteColumnNames = Lists.newArrayList();
for (Column remoteColumn : remoteColumns) {
remoteColumnNames.add(remoteColumn.getName());
}
Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
-
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName),
+
localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName),
columnNameMapping, isLowerCaseMetaNames);
- localColumnNames = result.get("localNames");
- conflictNames = result.get("conflictNames");
+ List<String> localColumnNames = result.get("localNames");
+ List<String> conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
- "Conflict column names found in remote database/schema: "
+ remoteDbName
+ "Conflict column names found in remote database/schema: "
+ remoteDatabaseName
+ " in remote table: " + remoteTableName
+ " when lower_case_meta_names is true: " +
conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the column
names.");
}
- // Replace the name in remoteColumns with localColumnNames
for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}
return remoteColumns;
}
- public String getRemoteDatabaseName(String localDbName) {
- return getRequiredMapping(localDBToRemoteDB, localDbName, "database",
this::loadDatabaseNamesIfNeeded,
- localDbName);
+ @Override
+ public String toRemoteDatabaseName(String localDatabaseName) {
+ // If mapping is not required, return the original input
+ if (!isLowerCaseMetaNames && isMappingInvalid()) {
+ return localDatabaseName;
+ }
+ return getRequiredMapping(localDBToRemoteDB, localDatabaseName,
"database", localDatabaseName);
}
- public String getRemoteTableName(String localDbName, String
localTableName) {
- String remoteDbName = getRemoteDatabaseName(localDbName);
- Map<String, String> tableMap =
localTableToRemoteTable.computeIfAbsent(remoteDbName,
+ @Override
+ public String toRemoteTableName(String remoteDatabaseName, String
localTableName) {
+ // If mapping is not required, return the original input
+ if (!isLowerCaseMetaNames && isMappingInvalid()) {
+ return localTableName;
+ }
+ Map<String, String> tableMap =
localTableToRemoteTable.computeIfAbsent(remoteDatabaseName,
k -> new ConcurrentHashMap<>());
- return getRequiredMapping(tableMap, localTableName, "table", () ->
loadTableNamesIfNeeded(localDbName),
- localTableName);
+ return getRequiredMapping(tableMap, localTableName, "table",
localTableName);
}
- public Map<String, String> getRemoteColumnNames(String localDbName, String
localTableName) {
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
+ @Override
+ public Map<String, String> toRemoteColumnNames(String remoteDatabaseName,
String remoteTableName) {
+ // If mapping is not required, return an empty map (since there's no
mapping)
+ if (!isLowerCaseMetaNames && isMappingInvalid()) {
+ return Collections.emptyMap();
+ }
ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
tableColumnMap
- = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k ->
new ConcurrentHashMap<>());
+ =
localColumnToRemoteColumn.computeIfAbsent(remoteDatabaseName, k -> new
ConcurrentHashMap<>());
Map<String, String> columnMap =
tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
if (columnMap.isEmpty()) {
- LOG.info("Column name mapping missing, loading column names for
localDbName: {}, localTableName: {}",
- localDbName, localTableName);
- loadColumnNamesIfNeeded(localDbName, localTableName);
- columnMap = tableColumnMap.get(remoteTableName);
- }
- if (columnMap.isEmpty()) {
- LOG.warn("No remote column found for localTableName: {}. Please
refresh this catalog.", localTableName);
+ LOG.warn("No remote column found for: {}. Please refresh this
catalog.", remoteTableName);
throw new RuntimeException(
- "No remote column found for localTableName: " +
localTableName + ". Please refresh this catalog.");
+ "No remote column found for: " + remoteTableName + ".
Please refresh this catalog.");
}
return columnMap;
}
-
- private void loadDatabaseNamesIfNeeded() {
- if (dbNamesLoaded.compareAndSet(false, true)) {
- try {
- loadDatabaseNames();
- } catch (Exception e) {
- dbNamesLoaded.set(false); // Reset on failure
- LOG.warn("Error loading database names", e);
- }
- }
- }
-
- private void loadTableNamesIfNeeded(String localDbName) {
- AtomicBoolean isLoaded =
tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
- if (isLoaded.compareAndSet(false, true)) {
- try {
- loadTableNames(localDbName);
- } catch (Exception e) {
- tableNamesLoadedMap.get(localDbName).set(false); // Reset on
failure
- LOG.warn("Error loading table names for localDbName: {}",
localDbName, e);
- }
- }
- }
-
- private void loadColumnNamesIfNeeded(String localDbName, String
localTableName) {
- columnNamesLoadedMap.putIfAbsent(localDbName, new
ConcurrentHashMap<>());
- AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
- .computeIfAbsent(localTableName, k -> new
AtomicBoolean(false));
- if (isLoaded.compareAndSet(false, true)) {
- try {
- loadColumnNames(localDbName, localTableName);
- } catch (Exception e) {
-
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset
on failure
- LOG.warn("Error loading column names for localDbName: {},
localTableName: {}", localDbName,
- localTableName, e);
- }
- }
- }
-
- private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName,
Runnable loadIfNeeded,
- String entityName) {
- if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
- LOG.info("{} mapping missing, loading for {}: {}", typeName,
typeName, entityName);
- loadIfNeeded.run();
- }
+ private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName,
String entityName) {
V value = map.get(key);
if (value == null) {
LOG.warn("No remote {} found for {}: {}. Please refresh this
catalog.", typeName, typeName, entityName);
@@ -268,18 +219,6 @@ public abstract class IdentifierMapping {
return value;
}
- // Load the database name from the data source.
- // In the corresponding getDatabaseNameList(), setDatabaseNameMapping()
must be used to update the mapping.
- protected abstract void loadDatabaseNames();
-
- // Load the table names for the specified database from the data source.
- // In the corresponding getTableNameList(), setTableNameMapping() must be
used to update the mapping.
- protected abstract void loadTableNames(String localDbName);
-
- // Load the column names for a specified table in a database from the data
source.
- // In the corresponding getColumnNameList(), setColumnNameMapping() must
be used to update the mapping.
- protected abstract void loadColumnNames(String localDbName, String
localTableName);
-
private JsonNode readAndParseJson(String jsonPath, String nodeName) {
JsonNode rootNode;
try {
@@ -302,7 +241,6 @@ public abstract class IdentifierMapping {
String mappedName = nameMapping.getOrDefault(name, name);
String localName = isLowerCaseMetaNames ? mappedName.toLowerCase()
: mappedName;
- // Use computeIfAbsent to ensure atomicity
localNameToRemoteName.computeIfAbsent(localName, k -> name);
if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
index 363ef351152..7745a25d27d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
@@ -18,313 +18,20 @@
package org.apache.doris.datasource.mapping;
import org.apache.doris.catalog.Column;
-import org.apache.doris.qe.GlobalVariable;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class IdentifierMapping {
- private static final Logger LOG =
LogManager.getLogger(IdentifierMapping.class);
-
- private final ObjectMapper mapper = new ObjectMapper();
- private final ConcurrentHashMap<String, String> localDBToRemoteDB = new
ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
localTableToRemoteTable
- = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, ConcurrentHashMap<String,
ConcurrentHashMap<String, String>>>
- localColumnToRemoteColumn = new ConcurrentHashMap<>();
-
- private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
- private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap
= new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, ConcurrentHashMap<String,
AtomicBoolean>> columnNamesLoadedMap
- = new ConcurrentHashMap<>();
-
- private final boolean isLowerCaseMetaNames;
- private final String metaNamesMapping;
-
- public IdentifierMapping(boolean isLowerCaseMetaNames, String
metaNamesMapping) {
- this.isLowerCaseMetaNames = isLowerCaseMetaNames;
- this.metaNamesMapping = metaNamesMapping;
- }
-
- public List<String> setDatabaseNameMapping(List<String>
remoteDatabaseNames) {
- JsonNode databasesNode = readAndParseJson(metaNamesMapping,
"databases");
-
- Map<String, String> databaseNameMapping = Maps.newTreeMap();
- if (databasesNode.isArray()) {
- for (JsonNode node : databasesNode) {
- String remoteDatabase = node.path("remoteDatabase").asText();
- String mapping = node.path("mapping").asText();
- databaseNameMapping.put(remoteDatabase, mapping);
- }
- }
-
- Map<String, List<String>> result =
nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
- databaseNameMapping, isLowerCaseMetaNames);
- List<String> localDatabaseNames = result.get("localNames");
- List<String> conflictNames = result.get("conflictNames");
- if (!conflictNames.isEmpty()) {
- throw new RuntimeException(
- "Conflict database/schema names found when
lower_case_meta_names is true: " + conflictNames
- + ". Please set lower_case_meta_names to false or"
- + " use meta_name_mapping to specify the names.");
- }
- return localDatabaseNames;
- }
-
- public List<String> setTableNameMapping(String remoteDbName, List<String>
remoteTableNames) {
- JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
-
- Map<String, String> tableNameMapping = Maps.newTreeMap();
- if (tablesNode.isArray()) {
- for (JsonNode node : tablesNode) {
- String remoteDatabase = node.path("remoteDatabase").asText();
- if (remoteDbName.equals(remoteDatabase)) {
- String remoteTable = node.path("remoteTable").asText();
- String mapping = node.path("mapping").asText();
- tableNameMapping.put(remoteTable, mapping);
- }
- }
- }
-
- localTableToRemoteTable.putIfAbsent(remoteDbName, new
ConcurrentHashMap<>());
-
- List<String> localTableNames;
- List<String> conflictNames;
-
- if (GlobalVariable.lowerCaseTableNames == 1) {
- Map<String, List<String>> result =
nameListToMapping(remoteTableNames,
- localTableToRemoteTable.get(remoteDbName),
- tableNameMapping, true);
- localTableNames = result.get("localNames");
- conflictNames = result.get("conflictNames");
- if (!conflictNames.isEmpty()) {
- throw new RuntimeException(
- "Conflict table names found in remote database/schema:
" + remoteDbName
- + " when lower_case_table_names is 1: " +
conflictNames
- + ". Please use meta_name_mapping to specify
the names.");
- }
- } else {
- Map<String, List<String>> result =
nameListToMapping(remoteTableNames,
- localTableToRemoteTable.get(remoteDbName),
- tableNameMapping, isLowerCaseMetaNames);
- localTableNames = result.get("localNames");
- conflictNames = result.get("conflictNames");
-
- if (!conflictNames.isEmpty()) {
- throw new RuntimeException(
- "Conflict table names found in remote database/schema:
" + remoteDbName
- + "when lower_case_meta_names is true: " +
conflictNames
- + ". Please set lower_case_meta_names to false
or"
- + " use meta_name_mapping to specify the table
names.");
- }
- }
- return localTableNames;
- }
-
- public List<Column> setColumnNameMapping(String remoteDbName, String
remoteTableName, List<Column> remoteColumns) {
- JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
-
- Map<String, String> columnNameMapping = Maps.newTreeMap();
- if (tablesNode.isArray()) {
- for (JsonNode node : tablesNode) {
- String remoteDatabase = node.path("remoteDatabase").asText();
- String remoteTable = node.path("remoteTable").asText();
- if (remoteDbName.equals(remoteDatabase) &&
remoteTable.equals(remoteTableName)) {
- String remoteColumn = node.path("remoteColumn").asText();
- String mapping = node.path("mapping").asText();
- columnNameMapping.put(remoteColumn, mapping);
- }
- }
- }
- localColumnToRemoteColumn.putIfAbsent(remoteDbName, new
ConcurrentHashMap<>());
-
localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new
ConcurrentHashMap<>());
-
- List<String> localColumnNames;
- List<String> conflictNames;
-
- // Get the name from localColumns and save it to List<String>
- List<String> remoteColumnNames = Lists.newArrayList();
- for (Column remoteColumn : remoteColumns) {
- remoteColumnNames.add(remoteColumn.getName());
- }
-
- Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
-
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName),
- columnNameMapping, isLowerCaseMetaNames);
- localColumnNames = result.get("localNames");
- conflictNames = result.get("conflictNames");
- if (!conflictNames.isEmpty()) {
- throw new RuntimeException(
- "Conflict column names found in remote database/schema: "
+ remoteDbName
- + " in remote table: " + remoteTableName
- + " when lower_case_meta_names is true: " +
conflictNames
- + ". Please set lower_case_meta_names to false or"
- + " use meta_name_mapping to specify the column
names.");
- }
- // Replace the name in remoteColumns with localColumnNames
- for (int i = 0; i < remoteColumns.size(); i++) {
- remoteColumns.get(i).setName(localColumnNames.get(i));
- }
- return remoteColumns;
- }
-
- public String getRemoteDatabaseName(String localDbName) {
- return getRequiredMapping(localDBToRemoteDB, localDbName, "database",
this::loadDatabaseNamesIfNeeded,
- localDbName);
- }
-
- public String getRemoteTableName(String localDbName, String
localTableName) {
- String remoteDbName = getRemoteDatabaseName(localDbName);
- Map<String, String> tableMap =
localTableToRemoteTable.computeIfAbsent(remoteDbName,
- k -> new ConcurrentHashMap<>());
- return getRequiredMapping(tableMap, localTableName, "table", () ->
loadTableNamesIfNeeded(localDbName),
- localTableName);
- }
-
- public Map<String, String> getRemoteColumnNames(String localDbName, String
localTableName) {
- String remoteDbName = getRemoteDatabaseName(localDbName);
- String remoteTableName = getRemoteTableName(localDbName,
localTableName);
- ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
tableColumnMap
- = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k ->
new ConcurrentHashMap<>());
- Map<String, String> columnMap =
tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
- if (columnMap.isEmpty()) {
- LOG.info("Column name mapping missing, loading column names for
localDbName: {}, localTableName: {}",
- localDbName, localTableName);
- loadColumnNamesIfNeeded(localDbName, localTableName);
- columnMap = tableColumnMap.get(remoteTableName);
- }
- if (columnMap.isEmpty()) {
- LOG.warn("No remote column found for localTableName: {}. Please
refresh this catalog.", localTableName);
- throw new RuntimeException(
- "No remote column found for localTableName: " +
localTableName + ". Please refresh this catalog.");
- }
- return columnMap;
- }
-
-
- private void loadDatabaseNamesIfNeeded() {
- if (dbNamesLoaded.compareAndSet(false, true)) {
- try {
- loadDatabaseNames();
- } catch (Exception e) {
- dbNamesLoaded.set(false); // Reset on failure
- LOG.warn("Error loading database names", e);
- }
- }
- }
-
- private void loadTableNamesIfNeeded(String localDbName) {
- AtomicBoolean isLoaded =
tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
- if (isLoaded.compareAndSet(false, true)) {
- try {
- loadTableNames(localDbName);
- } catch (Exception e) {
- tableNamesLoadedMap.get(localDbName).set(false); // Reset on
failure
- LOG.warn("Error loading table names for localDbName: {}",
localDbName, e);
- }
- }
- }
-
- private void loadColumnNamesIfNeeded(String localDbName, String
localTableName) {
- columnNamesLoadedMap.putIfAbsent(localDbName, new
ConcurrentHashMap<>());
- AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
- .computeIfAbsent(localTableName, k -> new
AtomicBoolean(false));
- if (isLoaded.compareAndSet(false, true)) {
- try {
- loadColumnNames(localDbName, localTableName);
- } catch (Exception e) {
-
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset
on failure
- LOG.warn("Error loading column names for localDbName: {},
localTableName: {}", localDbName,
- localTableName, e);
- }
- }
- }
-
- private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName,
Runnable loadIfNeeded,
- String entityName) {
- if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
- LOG.info("{} mapping missing, loading for {}: {}", typeName,
typeName, entityName);
- loadIfNeeded.run();
- }
- V value = map.get(key);
- if (value == null) {
- LOG.warn("No remote {} found for {}: {}. Please refresh this
catalog.", typeName, typeName, entityName);
- throw new RuntimeException("No remote " + typeName + " found for "
+ typeName + ": " + entityName
- + ". Please refresh this catalog.");
- }
- return value;
- }
-
- // Load the database name from the data source.
- // In the corresponding getDatabaseNameList(), setDatabaseNameMapping()
must be used to update the mapping.
- protected abstract void loadDatabaseNames();
-
- // Load the table names for the specified database from the data source.
- // In the corresponding getTableNameList(), setTableNameMapping() must be
used to update the mapping.
- protected abstract void loadTableNames(String localDbName);
-
- // Load the column names for a specified table in a database from the data
source.
- // In the corresponding getColumnNameList(), setColumnNameMapping() must
be used to update the mapping.
- protected abstract void loadColumnNames(String localDbName, String
localTableName);
-
- private JsonNode readAndParseJson(String jsonPath, String nodeName) {
- JsonNode rootNode;
- try {
- rootNode = mapper.readTree(jsonPath);
- return rootNode.path(nodeName);
- } catch (JsonProcessingException e) {
- throw new RuntimeException("parse meta_names_mapping property
error", e);
- }
- }
-
- private Map<String, List<String>> nameListToMapping(List<String>
remoteNames,
- ConcurrentHashMap<String, String> localNameToRemoteName,
- Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
- List<String> filteredDatabaseNames = Lists.newArrayList();
- Set<String> lowerCaseNames = Sets.newHashSet();
- Map<String, List<String>> nameMap = Maps.newHashMap();
- List<String> conflictNames = Lists.newArrayList();
- for (String name : remoteNames) {
- String mappedName = nameMapping.getOrDefault(name, name);
- String localName = isLowerCaseMetaNames ? mappedName.toLowerCase()
: mappedName;
+public interface IdentifierMapping {
+ List<String> fromRemoteDatabaseName(List<String> remoteDatabaseNames);
- // Use computeIfAbsent to ensure atomicity
- localNameToRemoteName.computeIfAbsent(localName, k -> name);
+ List<String> fromRemoteTableName(String remoteDatabaseName, List<String>
remoteTableNames);
- if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
- if (nameMap.containsKey(localName)) {
- nameMap.get(localName).add(mappedName);
- }
- } else {
- nameMap.putIfAbsent(localName,
Lists.newArrayList(Collections.singletonList(mappedName)));
- }
+ List<Column> fromRemoteColumnName(String remoteDatabaseName, String
remoteTableName, List<Column> remoteColumns);
- filteredDatabaseNames.add(localName);
- }
+ String toRemoteDatabaseName(String localDatabaseName);
- for (List<String> conflictNameList : nameMap.values()) {
- if (conflictNameList.size() > 1) {
- conflictNames.addAll(conflictNameList);
- }
- }
+ String toRemoteTableName(String remoteDatabaseName, String localTableName);
- Map<String, List<String>> result = Maps.newConcurrentMap();
- result.put("localNames", filteredDatabaseNames);
- result.put("conflictNames", conflictNames);
- return result;
- }
+ Map<String, String> toRemoteColumnNames(String remoteDatabaseName, String
remoteTableName);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]