This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75cb520cbc5 [Enhancement](external catalog) Added status reset when
jdbc name mapping is abnormal (#33971)
75cb520cbc5 is described below
commit 75cb520cbc5b74a9ba47a4fdcfdaaeadcd33da35
Author: zy-kkk <[email protected]>
AuthorDate: Sat Apr 27 15:40:40 2024 +0800
[Enhancement](external catalog) Added status reset when jdbc name mapping
is abnormal (#33971)
---
.../datasource/mapping/IdentifierMapping.java | 82 +++++++++++++++-------
1 file changed, 57 insertions(+), 25 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
index cd121f2b630..363ef351152 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
@@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
@@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class IdentifierMapping {
+ private static final Logger LOG =
LogManager.getLogger(IdentifierMapping.class);
private final ObjectMapper mapper = new ObjectMapper();
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new
ConcurrentHashMap<>();
@@ -179,51 +182,59 @@ public abstract class IdentifierMapping {
}
public String getRemoteDatabaseName(String localDbName) {
- if (localDBToRemoteDB.isEmpty() ||
!localDBToRemoteDB.containsKey(localDbName)) {
- loadDatabaseNamesIfNeeded();
- }
- return localDBToRemoteDB.get(localDbName);
+ return getRequiredMapping(localDBToRemoteDB, localDbName, "database",
this::loadDatabaseNamesIfNeeded,
+ localDbName);
}
public String getRemoteTableName(String localDbName, String
localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
- if (localTableToRemoteTable.isEmpty()
- || !localTableToRemoteTable.containsKey(remoteDbName)
- || localTableToRemoteTable.get(remoteDbName) == null
- || localTableToRemoteTable.get(remoteDbName).isEmpty()
- ||
!localTableToRemoteTable.get(remoteDbName).containsKey(localTableName)
- ||
localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) {
- loadTableNamesIfNeeded(localDbName);
- }
-
- return localTableToRemoteTable.get(remoteDbName).get(localTableName);
+ Map<String, String> tableMap =
localTableToRemoteTable.computeIfAbsent(remoteDbName,
+ k -> new ConcurrentHashMap<>());
+ return getRequiredMapping(tableMap, localTableName, "table", () ->
loadTableNamesIfNeeded(localDbName),
+ localTableName);
}
public Map<String, String> getRemoteColumnNames(String localDbName, String
localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName,
localTableName);
- if (localColumnToRemoteColumn.isEmpty()
- || !localColumnToRemoteColumn.containsKey(remoteDbName)
- || localColumnToRemoteColumn.get(remoteDbName) == null
- || localColumnToRemoteColumn.get(remoteDbName).isEmpty()
- ||
!localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName)
- ||
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null
- ||
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName).isEmpty()) {
+ ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
tableColumnMap
+ = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k ->
new ConcurrentHashMap<>());
+ Map<String, String> columnMap =
tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
+ if (columnMap.isEmpty()) {
+ LOG.info("Column name mapping missing, loading column names for
localDbName: {}, localTableName: {}",
+ localDbName, localTableName);
loadColumnNamesIfNeeded(localDbName, localTableName);
+ columnMap = tableColumnMap.get(remoteTableName);
}
- return
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName);
+ if (columnMap.isEmpty()) {
+ LOG.warn("No remote column found for localTableName: {}. Please
refresh this catalog.", localTableName);
+ throw new RuntimeException(
+ "No remote column found for localTableName: " +
localTableName + ". Please refresh this catalog.");
+ }
+ return columnMap;
}
+
private void loadDatabaseNamesIfNeeded() {
if (dbNamesLoaded.compareAndSet(false, true)) {
- loadDatabaseNames();
+ try {
+ loadDatabaseNames();
+ } catch (Exception e) {
+ dbNamesLoaded.set(false); // Reset on failure
+ LOG.warn("Error loading database names", e);
+ }
}
}
private void loadTableNamesIfNeeded(String localDbName) {
AtomicBoolean isLoaded =
tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
- loadTableNames(localDbName);
+ try {
+ loadTableNames(localDbName);
+ } catch (Exception e) {
+ tableNamesLoadedMap.get(localDbName).set(false); // Reset on
failure
+ LOG.warn("Error loading table names for localDbName: {}",
localDbName, e);
+ }
}
}
@@ -232,8 +243,29 @@ public abstract class IdentifierMapping {
AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
.computeIfAbsent(localTableName, k -> new
AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
- loadColumnNames(localDbName, localTableName);
+ try {
+ loadColumnNames(localDbName, localTableName);
+ } catch (Exception e) {
+
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset
on failure
+ LOG.warn("Error loading column names for localDbName: {},
localTableName: {}", localDbName,
+ localTableName, e);
+ }
+ }
+ }
+
+ private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName,
Runnable loadIfNeeded,
+ String entityName) {
+ if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
+ LOG.info("{} mapping missing, loading for {}: {}", typeName,
typeName, entityName);
+ loadIfNeeded.run();
+ }
+ V value = map.get(key);
+ if (value == null) {
+ LOG.warn("No remote {} found for {}: {}. Please refresh this
catalog.", typeName, typeName, entityName);
+ throw new RuntimeException("No remote " + typeName + " found for "
+ typeName + ": " + entityName
+ + ". Please refresh this catalog.");
}
+ return value;
}
// Load the database name from the data source.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]