This is an automated email from the ASF dual-hosted git repository.
ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ed89565fcbe [fix](split) FileSystemCacheKey are always different in
overload equals (#36432)
ed89565fcbe is described below
commit ed89565fcbe3fa712a35a0fc30114d3ff65fe076
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Jun 21 08:56:50 2024 +0800
[fix](split) FileSystemCacheKey are always different in overload equals
(#36432)
## Proposed changes
## Fixed Bugs introduced from #33937
1. `FileSystemCacheKey.equals()` compares properties by `==`, resulting
in creating new file system in each partition
2. `dfsFileSystem` is not synchronized, resulting in creating more file
systems than need.
3. `jobConf.iterator()` will produce more than 2000 pairs of key-value
---
.../doris/datasource/hive/HiveMetaStoreCache.java | 20 +++++------
.../java/org/apache/doris/fs/FileSystemCache.java | 40 +++++++++++++++++-----
.../org/apache/doris/fs/remote/S3FileSystem.java | 24 +++++++------
.../apache/doris/fs/remote/dfs/DFSFileSystem.java | 35 +++++++++----------
4 files changed, 70 insertions(+), 49 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index b76b4675dee..f402d27cf6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -349,11 +349,11 @@ public class HiveMetaStoreCache {
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
- Map<String, String> properties = new HashMap<>();
- jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(),
e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
- location, bindBrokerName), properties,
bindBrokerName));
+ location, bindBrokerName),
+ catalog.getCatalogProperty().getProperties(),
+ bindBrokerName, jobConf));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the
same time. eg:
@@ -781,12 +781,12 @@ public class HiveMetaStoreCache {
return Collections.emptyList();
}
String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
- Map<String, String> properties = new HashMap<>();
- jobConf.iterator().forEachRemaining(e ->
properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
- bindBrokerName), properties,
bindBrokerName));
+ bindBrokerName),
+
catalog.getCatalogProperty().getProperties(),
+ bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
@@ -806,12 +806,10 @@ public class HiveMetaStoreCache {
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (AcidUtils.ParsedDelta delta :
directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
- Map<String, String> properties = new HashMap<>();
- jobConf.iterator().forEachRemaining(e ->
properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
bindBrokerName),
- properties, bindBrokerName));
+
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
@@ -833,12 +831,10 @@ public class HiveMetaStoreCache {
// base
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
- Map<String, String> properties = new HashMap<>();
- jobConf.iterator().forEachRemaining(e ->
properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
bindBrokerName),
- properties, bindBrokerName));
+
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index dd66c359b9d..e96258dc719 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -23,14 +23,16 @@ import org.apache.doris.common.Pair;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.hadoop.conf.Configuration;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
public class FileSystemCache {
- private LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache;
+ private final LoadingCache<FileSystemCacheKey, RemoteFileSystem>
fileSystemCache;
public FileSystemCache() {
// no need to set refreshAfterWrite, because the FileSystem is created
once and never changed
@@ -40,11 +42,11 @@ public class FileSystemCache {
Config.max_remote_file_system_cache_num,
false,
null);
- fileSystemCache = fsCacheFactory.buildCache(key ->
loadFileSystem(key));
+ fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem);
}
private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
- return FileSystemFactory.getRemoteFileSystem(key.type, key.properties,
key.bindBrokerName);
+ return FileSystemFactory.getRemoteFileSystem(key.type,
key.getFsProperties(), key.bindBrokerName);
}
public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@@ -57,13 +59,32 @@ public class FileSystemCache {
private final String fsIdent;
private final Map<String, String> properties;
private final String bindBrokerName;
+ // only for creating new file system
+ private final Configuration conf;
public FileSystemCacheKey(Pair<FileSystemType, String> fs,
- Map<String, String> properties, String bindBrokerName) {
+ Map<String, String> properties,
+ String bindBrokerName,
+ Configuration conf) {
this.type = fs.first;
this.fsIdent = fs.second;
this.properties = properties;
this.bindBrokerName = bindBrokerName;
+ this.conf = conf;
+ }
+
+ public FileSystemCacheKey(Pair<FileSystemType, String> fs,
+ Map<String, String> properties, String bindBrokerName) {
+ this(fs, properties, bindBrokerName, null);
+ }
+
+ public Map<String, String> getFsProperties() {
+ if (conf == null) {
+ return properties;
+ }
+ Map<String, String> result = new HashMap<>();
+ conf.iterator().forEachRemaining(e -> result.put(e.getKey(),
e.getValue()));
+ return result;
}
@Override
@@ -74,13 +95,14 @@ public class FileSystemCache {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
- boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey)
obj).type)
- && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
- && properties == ((FileSystemCacheKey) obj).properties;
+ FileSystemCacheKey o = (FileSystemCacheKey) obj;
+ boolean equalsWithoutBroker = type.equals(o.type)
+ && fsIdent.equals(o.fsIdent)
+ && properties.equals(o.properties);
if (bindBrokerName == null) {
- return equalsWithoutBroker;
+ return equalsWithoutBroker && o.bindBrokerName == null;
}
- return equalsWithoutBroker &&
bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
+ return equalsWithoutBroker &&
bindBrokerName.equals(o.bindBrokerName);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 2b94d2195da..3130a0cea52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -58,16 +58,20 @@ public class S3FileSystem extends ObjFileSystem {
@Override
protected FileSystem nativeFileSystem(String remotePath) throws
UserException {
if (dfsFileSystem == null) {
- Configuration conf = new Configuration();
- System.setProperty("com.amazonaws.services.s3.enableV4", "true");
- // the entry value in properties may be null, and
-
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
- .filter(entry -> entry.getKey() != null &&
entry.getValue() != null)
- .forEach(entry -> conf.set(entry.getKey(),
entry.getValue()));
- try {
- dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(),
conf);
- } catch (Exception e) {
- throw new UserException("Failed to get S3 FileSystem for " +
e.getMessage(), e);
+ synchronized (this) {
+ if (dfsFileSystem == null) {
+ Configuration conf = new Configuration();
+ System.setProperty("com.amazonaws.services.s3.enableV4",
"true");
+ // the entry value in properties may be null, and
+
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
+ .filter(entry -> entry.getKey() != null &&
entry.getValue() != null)
+ .forEach(entry -> conf.set(entry.getKey(),
entry.getValue()));
+ try {
+ dfsFileSystem = FileSystem.get(new
Path(remotePath).toUri(), conf);
+ } catch (Exception e) {
+ throw new UserException("Failed to get S3 FileSystem
for " + e.getMessage(), e);
+ }
+ }
}
}
return dfsFileSystem;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 7f31f8eed49..d608653024f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -30,7 +30,6 @@ import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -73,25 +72,25 @@ public class DFSFileSystem extends RemoteFileSystem {
@VisibleForTesting
@Override
public FileSystem nativeFileSystem(String remotePath) throws UserException
{
- if (dfsFileSystem != null) {
- return dfsFileSystem;
- }
-
- Configuration conf = new HdfsConfiguration();
- for (Map.Entry<String, String> propEntry : properties.entrySet()) {
- conf.set(propEntry.getKey(), propEntry.getValue());
- }
+ if (dfsFileSystem == null) {
+ synchronized (this) {
+ if (dfsFileSystem == null) {
+ Configuration conf = new HdfsConfiguration();
+ for (Map.Entry<String, String> propEntry :
properties.entrySet()) {
+ conf.set(propEntry.getKey(), propEntry.getValue());
+ }
- dfsFileSystem =
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
- try {
- return FileSystem.get(new Path(remotePath).toUri(), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ dfsFileSystem =
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
+ try {
+ return FileSystem.get(new
Path(remotePath).toUri(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ operations = new HDFSFileOperations(dfsFileSystem);
+ }
}
- });
-
- Preconditions.checkNotNull(dfsFileSystem);
- operations = new HDFSFileOperations(dfsFileSystem);
+ }
return dfsFileSystem;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]