This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4976021bf7 [Enhancement] Doris broker support aliyun-oss #13665
(#14305)
4976021bf7 is described below
commit 4976021bf70ace17738e5e55025308c7750bacc5
Author: 周翱 <[email protected]>
AuthorDate: Mon Nov 21 10:29:14 2022 +0800
[Enhancement] Doris broker support aliyun-oss #13665 (#14305)
---
.../java/org/apache/doris/analysis/ExportStmt.java | 3 +-
fs_brokers/apache_hdfs_broker/pom.xml | 8 +++-
.../doris/broker/hdfs/FileSystemManager.java | 51 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 1a875f0def..e36416fc03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -239,9 +239,10 @@ public class ExportStmt extends StatementBase {
if (schema == null || (!schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
+ && !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a"))) {
throw new AnalysisException("Invalid broker path. please use
valid 'hdfs://', 'ofs://', 'obs://',"
- + " or 's3a://' path.");
+ + "'oss://'," + " or 's3a://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml
b/fs_brokers/apache_hdfs_broker/pom.xml
index 8aaca1aad8..92361ab6cd 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -69,7 +69,7 @@ under the License.
<maven.compiler.target>1.8</maven.compiler.target>
<log4j2.version>2.18.0</log4j2.version>
<project.scm.id>github</project.scm.id>
- <hadoop.version>2.8.3</hadoop.version>
+ <hadoop.version>2.9.1</hadoop.version>
</properties>
<profiles>
<!-- for custom internal repository -->
@@ -284,6 +284,12 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-huaweicloud</artifactId>
+ <version>2.8.3</version>
+ </dependency>
+ <!--
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aliyun -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aliyun</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index af72395a33..5b1ef33491 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -69,6 +69,7 @@ public class FileSystemManager {
private static final String KS3_SCHEME = "ks3";
private static final String CHDFS_SCHEME = "ofs";
private static final String OBS_SCHEME = "obs";
+ private static final String OSS_SCHEME = "oss";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -115,6 +116,14 @@ public class FileSystemManager {
// This property is used like 'fs.ks3.impl.disable.cache'
private static final String FS_KS3_IMPL_DISABLE_CACHE =
"fs.ks3.impl.disable.cache";
+ // arguments for oss
+ private static final String FS_OSS_ACCESS_KEY = "fs.oss.accessKeyId";
+ private static final String FS_OSS_SECRET_KEY = "fs.oss.accessKeySecret";
+ private static final String FS_OSS_ENDPOINT = "fs.oss.endpoint";
+ // This property is used like 'fs.oss.impl.disable.cache'
+ private static final String FS_OSS_IMPL_DISABLE_CACHE =
"fs.oss.impl.disable.cache";
+ private static final String FS_OSS_IMPL = "fs.oss.impl";
+
private ScheduledExecutorService handleManagementPool =
Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@@ -176,6 +185,8 @@ public class FileSystemManager {
brokerFileSystem = getChdfsFileSystem(path, properties);
} else if (scheme.equals(OBS_SCHEME)) {
brokerFileSystem = getOBSFileSystem(path, properties);
+ } else if (scheme.equals(OSS_SCHEME)) {
+ brokerFileSystem = getOSSFileSystem(path, properties);
} else {
throw new
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@@ -514,6 +525,46 @@ public class FileSystemManager {
}
}
+ /**
+ * file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
+ * @param path
+ * @param properties
+ * @return
+ */
+ public BrokerFileSystem getOSSFileSystem(String path, Map<String, String>
properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String accessKey = properties.getOrDefault(FS_OSS_ACCESS_KEY, "");
+ String secretKey = properties.getOrDefault(FS_OSS_SECRET_KEY, "");
+ String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, "");
+ String disableCache =
properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true");
+ String host = OSS_SCHEME + "://" + endpoint + "/" +
pathUri.getUri().getHost();
+ String obsUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host,
obsUgi);
+ cachedFileSystem.putIfAbsent(fileSystemIdentity, new
BrokerFileSystem(fileSystemIdentity));
+ BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
+ try {
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("create file system for new path " + path);
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ conf.set(FS_OSS_ACCESS_KEY, accessKey);
+ conf.set(FS_OSS_SECRET_KEY, secretKey);
+ conf.set(FS_OSS_ENDPOINT, endpoint);
+ conf.set(FS_OSS_IMPL,
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ conf.set(FS_OSS_IMPL_DISABLE_CACHE, disableCache);
+ FileSystem ossFileSystem = FileSystem.get(pathUri.getUri(),
conf);
+ fileSystem.setFileSystem(ossFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
/**
* visible for test
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]