This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f87f29e1ab [fix](multi-catalog)compatible with hdfs HA empty prefix
(#22342)
f87f29e1ab is described below
commit f87f29e1abf7c5d6fd20ce25f3e6b3f1e61938c2
Author: slothever <[email protected]>
AuthorDate: Sun Jul 30 22:21:14 2023 +0800
[fix](multi-catalog)compatible with hdfs HA empty prefix (#22342)
compatible with hdfs HA empty prefix
for example: ’hdfs:///‘ will be replaced to ’hdfs://ha-nameservice/‘
---
.../org/apache/doris/catalog/HdfsResource.java | 1 +
.../java/org/apache/doris/common/util/S3Util.java | 35 +++++++++++++++++++---
.../doris/datasource/hive/HiveMetaStoreCache.java | 7 +++--
.../planner/external/iceberg/IcebergScanNode.java | 7 +++--
.../planner/external/iceberg/IcebergSplit.java | 6 +++-
5 files changed, 45 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index cdfb169590..2b50ec63b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -53,6 +53,7 @@ public class HdfsResource extends Resource {
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
+ public static final String HDFS_PREFIX = "hdfs://";
@SerializedName(value = "properties")
private Map<String, String> properties;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 64c897c306..a47d838537 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,9 +17,11 @@
package org.apache.doris.common.util;
+import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.credentials.CloudCredential;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,7 +40,9 @@ import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
+import java.util.Map;
public class S3Util {
private static final Logger LOG = LogManager.getLogger(S3Util.class);
@@ -63,7 +67,7 @@ public class S3Util {
* @param location origin location
* @return metadata location path. just convert when storage is compatible
with s3 client.
*/
- public static String convertToS3IfNecessary(String location) {
+ public static String convertToS3IfNecessary(String location, Map<String,
String> props) {
LOG.debug("try convert location to s3 prefix: " + location);
if (isObjStorageUseS3Client(location)) {
int pos = location.indexOf("://");
@@ -72,7 +76,30 @@ public class S3Util {
}
return "s3" + location.substring(pos);
}
- return location;
+ return normalizedLocation(location, props);
+ }
+
+ private static String normalizedLocation(String location, Map<String,
String> props) {
+ try {
+ URI normalizedUri = new URI(location);
+ if (StringUtils.isEmpty(normalizedUri.getHost()) &&
location.startsWith(HdfsResource.HDFS_PREFIX)) {
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ if (StringUtils.isNotEmpty(host)) {
+ // Replace 'hdfs://' to 'hdfs://name_service', for
example: hdfs:///abc to hdfs://name_service/abc
+ return location.replace(HdfsResource.HDFS_PREFIX,
HdfsResource.HDFS_PREFIX + host);
+ } else {
+ // If no hadoop HA config
+ if (location.startsWith(HdfsResource.HDFS_PREFIX + '/')) {
+ // Do not support hdfs:///location
+ throw new RuntimeException("Invalid location with
empty host: " + location);
+ }
+ }
+ }
+ return location;
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
/**
@@ -80,7 +107,7 @@ public class S3Util {
* @param location origin split path
* @return BE scan range path
*/
- public static Path toScanRangeLocation(String location) {
+ public static Path toScanRangeLocation(String location, Map<String,
String> props) {
// All storage will use s3 client on BE.
if (isObjStorage(location)) {
int pos = location.indexOf("://");
@@ -95,7 +122,7 @@ public class S3Util {
location = "s3" + location.substring(pos);
}
}
- return new Path(location);
+ return new Path(normalizedLocation(location, props));
}
public static boolean isHdfsOnOssEndpoint(String location) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 0bd190d945..e1fa35d07e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -378,7 +378,7 @@ public class HiveMetaStoreCache {
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true,
true);
for (RemoteFile remoteFile : locatedFiles.files()) {
Path srcPath = remoteFile.getPath();
- Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString());
+ Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
if (!convertedPath.toString().equals(srcPath.toString())) {
remoteFile.setPath(convertedPath);
}
@@ -403,7 +403,7 @@ public class HiveMetaStoreCache {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- String finalLocation = S3Util.convertToS3IfNecessary(key.location);
+ String finalLocation = S3Util.convertToS3IfNecessary(key.location,
catalog.getProperties());
// disable the fs cache in FileSystem, or it will always from new
FileSystem
// and save it in cache when calling
FileInputFormat.setInputPaths().
try {
@@ -437,7 +437,8 @@ public class HiveMetaStoreCache {
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs =
((org.apache.hadoop.mapred.FileSplit) splits[i]);
// todo: get modification time
- Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString());
+ Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString(),
+ catalog.getProperties());
result.addSplit(new FileSplit(splitFilePath,
fs.getStart(), fs.getLength(), -1, null, null));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 3d3634fb66..23bd919461 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -125,7 +125,7 @@ public class IcebergScanNode extends FileQueryScanNode {
for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
-
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
+
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath,
icebergSplit.getConfig()).toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -188,13 +188,14 @@ public class IcebergScanNode extends FileQueryScanNode {
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
String dataFilePath = splitTask.file().path().toString();
- Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath);
+ Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
splitTask.start(),
splitTask.length(),
splitTask.file().fileSizeInBytes(),
- new String[0]);
+ new String[0],
+ source.getCatalog().getProperties());
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 9064017088..de3f2ec6aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -23,16 +23,20 @@ import lombok.Data;
import org.apache.hadoop.fs.Path;
import java.util.List;
+import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
// File path will be changed if the file is modified, so there's no need
to get modification time.
- public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts) {
+ public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ Map<String, String> config) {
super(file, start, length, fileLength, hosts, null);
+ this.config = config;
}
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
+ private Map<String, String> config;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]