This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 08e71e5f84 [fix](multi-catalog)compatible with hdfs HA empty prefix
(#22480)
08e71e5f84 is described below
commit 08e71e5f8489a93262b30b1371fb168a590d0b60
Author: slothever <[email protected]>
AuthorDate: Wed Aug 2 09:52:20 2023 +0800
[fix](multi-catalog)compatible with hdfs HA empty prefix (#22480)
cherry-pick from master,
#22424
---
docs/en/docs/lakehouse/multi-catalog/iceberg.md | 15 +++++++
docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 15 +++++++
.../org/apache/doris/catalog/HdfsResource.java | 1 +
.../java/org/apache/doris/common/util/S3Util.java | 50 ++++++++++++++++++++--
.../doris/datasource/hive/HiveMetaStoreCache.java | 7 +--
.../planner/external/iceberg/IcebergScanNode.java | 7 +--
.../planner/external/iceberg/IcebergSplit.java | 6 ++-
7 files changed, 91 insertions(+), 10 deletions(-)
diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md
b/docs/en/docs/lakehouse/multi-catalog/iceberg.md
index 4509fbc4fa..54af57bee8 100644
--- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md
@@ -93,11 +93,26 @@ see [Alibaba Cloud DLF Catalog](dlf.md)
This method needs to provide REST services in advance, and users need to
implement the REST interface for obtaining Iceberg metadata.
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://172.21.0.1:8181'
+);
+```
+
+If the data is on HDFS and High Availability (HA) is set up, need to add HA
configuration to the Catalog.
+
```sql
CREATE CATALOG iceberg PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://172.21.0.1:8181',
+ 'dfs.nameservices'='your-nameservice',
+ 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+ 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.1:8020',
+ 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.2:8020',
+
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
```
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
index c93fca28e5..ab5b447005 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
@@ -93,11 +93,26 @@ Iceberg 属性详情参见 [Iceberg Glue
Catalog](https://iceberg.apache.org/doc
该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://172.21.0.1:8181'
+);
+```
+
+如果使用HDFS存储数据,并开启了高可用模式,还需在Catalog中增加HDFS高可用配置:
+
```sql
CREATE CATALOG iceberg PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://172.21.0.1:8181',
+ 'dfs.nameservices'='your-nameservice',
+ 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+ 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.1:8020',
+ 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.2:8020',
+
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
```
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index cdfb169590..9735f2f059 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -53,6 +53,7 @@ public class HdfsResource extends Resource {
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
+ public static final String HDFS_PREFIX = "hdfs:";
@SerializedName(value = "properties")
private Map<String, String> properties;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 64c897c306..623e699fb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,9 +17,11 @@
package org.apache.doris.common.util;
+import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.credentials.CloudCredential;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,7 +40,9 @@ import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
+import java.util.Map;
public class S3Util {
private static final Logger LOG = LogManager.getLogger(S3Util.class);
@@ -63,7 +67,7 @@ public class S3Util {
* @param location origin location
* @return metadata location path. just convert when storage is compatible
with s3 client.
*/
- public static String convertToS3IfNecessary(String location) {
+ public static String convertToS3IfNecessary(String location, Map<String,
String> props) {
LOG.debug("try convert location to s3 prefix: " + location);
if (isObjStorageUseS3Client(location)) {
int pos = location.indexOf("://");
@@ -72,6 +76,46 @@ public class S3Util {
}
return "s3" + location.substring(pos);
}
+ return normalizedLocation(location, props);
+ }
+
+ private static String normalizedLocation(String location, Map<String,
String> props) {
+ try {
+ if (location.startsWith(HdfsResource.HDFS_PREFIX)) {
+ return normalizedHdfsPath(location, props);
+ }
+ return location;
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private static String normalizedHdfsPath(String location, Map<String,
String> props) throws URISyntaxException {
+ URI normalizedUri = new URI(location);
+ // compatible with 'hdfs:///' or 'hdfs:/'
+ if (StringUtils.isEmpty(normalizedUri.getHost())) {
+ String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
+ String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
+ if (location.startsWith(brokenPrefix) &&
!location.startsWith(normalizedPrefix)) {
+ location = location.replace(brokenPrefix, normalizedPrefix);
+ }
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ if (StringUtils.isNotEmpty(host)) {
+ // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
+ // Or hdfs:///abc to hdfs://name_service/abc
+ return location.replace(normalizedPrefix, normalizedPrefix +
host + "/");
+ } else {
+ // 'hdfs://null/' equals the 'hdfs:///'
+ if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
+ // Do not support hdfs:///location
+ throw new RuntimeException("Invalid location with empty
host: " + location);
+ } else {
+ // Replace 'hdfs://key/' to '/key/', try access local
NameNode on BE.
+ return location.replace(normalizedPrefix, "/");
+ }
+ }
+ }
return location;
}
@@ -80,7 +124,7 @@ public class S3Util {
* @param location origin split path
* @return BE scan range path
*/
- public static Path toScanRangeLocation(String location) {
+ public static Path toScanRangeLocation(String location, Map<String,
String> props) {
// All storage will use s3 client on BE.
if (isObjStorage(location)) {
int pos = location.indexOf("://");
@@ -95,7 +139,7 @@ public class S3Util {
location = "s3" + location.substring(pos);
}
}
- return new Path(location);
+ return new Path(normalizedLocation(location, props));
}
public static boolean isHdfsOnOssEndpoint(String location) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 0bd190d945..e1fa35d07e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -378,7 +378,7 @@ public class HiveMetaStoreCache {
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true,
true);
for (RemoteFile remoteFile : locatedFiles.files()) {
Path srcPath = remoteFile.getPath();
- Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString());
+ Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
if (!convertedPath.toString().equals(srcPath.toString())) {
remoteFile.setPath(convertedPath);
}
@@ -403,7 +403,7 @@ public class HiveMetaStoreCache {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- String finalLocation = S3Util.convertToS3IfNecessary(key.location);
+ String finalLocation = S3Util.convertToS3IfNecessary(key.location,
catalog.getProperties());
// disable the fs cache in FileSystem, or it will always from new
FileSystem
// and save it in cache when calling
FileInputFormat.setInputPaths().
try {
@@ -437,7 +437,8 @@ public class HiveMetaStoreCache {
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs =
((org.apache.hadoop.mapred.FileSplit) splits[i]);
// todo: get modification time
- Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString());
+ Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString(),
+ catalog.getProperties());
result.addSplit(new FileSplit(splitFilePath,
fs.getStart(), fs.getLength(), -1, null, null));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 3d3634fb66..23bd919461 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -125,7 +125,7 @@ public class IcebergScanNode extends FileQueryScanNode {
for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
-
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
+
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath,
icebergSplit.getConfig()).toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -188,13 +188,14 @@ public class IcebergScanNode extends FileQueryScanNode {
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
String dataFilePath = splitTask.file().path().toString();
- Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath);
+ Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
splitTask.start(),
splitTask.length(),
splitTask.file().fileSizeInBytes(),
- new String[0]);
+ new String[0],
+ source.getCatalog().getProperties());
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 9064017088..de3f2ec6aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -23,16 +23,20 @@ import lombok.Data;
import org.apache.hadoop.fs.Path;
import java.util.List;
+import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
// File path will be changed if the file is modified, so there's no need
to get modification time.
- public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts) {
+ public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ Map<String, String> config) {
super(file, start, length, fileLength, hosts, null);
+ this.config = config;
}
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
+ private Map<String, String> config;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]