This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4976021bf7 [Enhancement] Doris broker support aliyun-oss #13665 
(#14305)
4976021bf7 is described below

commit 4976021bf70ace17738e5e55025308c7750bacc5
Author: 周翱 <[email protected]>
AuthorDate: Mon Nov 21 10:29:14 2022 +0800

    [Enhancement] Doris broker support aliyun-oss #13665 (#14305)
---
 .../java/org/apache/doris/analysis/ExportStmt.java |  3 +-
 fs_brokers/apache_hdfs_broker/pom.xml              |  8 +++-
 .../doris/broker/hdfs/FileSystemManager.java       | 51 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 1a875f0def..e36416fc03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -239,9 +239,10 @@ public class ExportStmt extends StatementBase {
             if (schema == null || (!schema.equalsIgnoreCase("hdfs")
                     && !schema.equalsIgnoreCase("ofs")
                     && !schema.equalsIgnoreCase("obs")
+                    && !schema.equalsIgnoreCase("oss")
                     && !schema.equalsIgnoreCase("s3a"))) {
                 throw new AnalysisException("Invalid broker path. please use 
valid 'hdfs://', 'ofs://', 'obs://',"
-                    + " or 's3a://' path.");
+                    + "'oss://'," + " or 's3a://' path.");
             }
         } else if (type == StorageBackend.StorageType.S3) {
             if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml 
b/fs_brokers/apache_hdfs_broker/pom.xml
index 8aaca1aad8..92361ab6cd 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -69,7 +69,7 @@ under the License.
         <maven.compiler.target>1.8</maven.compiler.target>
         <log4j2.version>2.18.0</log4j2.version>
         <project.scm.id>github</project.scm.id>
-        <hadoop.version>2.8.3</hadoop.version>
+        <hadoop.version>2.9.1</hadoop.version>
     </properties>
     <profiles>
         <!-- for custom internal repository -->
@@ -284,6 +284,12 @@ under the License.
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-huaweicloud</artifactId>
+            <version>2.8.3</version>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aliyun -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aliyun</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
     </dependencies>
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index af72395a33..5b1ef33491 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -69,6 +69,7 @@ public class FileSystemManager {
     private static final String KS3_SCHEME = "ks3";
     private static final String CHDFS_SCHEME = "ofs";
     private static final String OBS_SCHEME = "obs";
+    private static final String OSS_SCHEME = "oss";
 
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -115,6 +116,14 @@ public class FileSystemManager {
     // This property is used like 'fs.ks3.impl.disable.cache'
     private static final String FS_KS3_IMPL_DISABLE_CACHE = 
"fs.ks3.impl.disable.cache";
 
+    // arguments for oss
+    private static final String FS_OSS_ACCESS_KEY = "fs.oss.accessKeyId";
+    private static final String FS_OSS_SECRET_KEY = "fs.oss.accessKeySecret";
+    private static final String FS_OSS_ENDPOINT = "fs.oss.endpoint";
+    // This property is used like 'fs.oss.impl.disable.cache'
+    private static final String FS_OSS_IMPL_DISABLE_CACHE = 
"fs.oss.impl.disable.cache";
+    private static final String FS_OSS_IMPL = "fs.oss.impl";
+
     private ScheduledExecutorService handleManagementPool = 
Executors.newScheduledThreadPool(2);
 
     private int readBufferSize = 128 << 10; // 128k
@@ -176,6 +185,8 @@ public class FileSystemManager {
             brokerFileSystem = getChdfsFileSystem(path, properties);
         } else if (scheme.equals(OBS_SCHEME)) {
             brokerFileSystem = getOBSFileSystem(path, properties);
+        } else if (scheme.equals(OSS_SCHEME)) {
+            brokerFileSystem = getOSSFileSystem(path, properties);
         } else {
             throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
@@ -514,6 +525,46 @@ public class FileSystemManager {
         }
     }
 
+   /**
+     * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
+     * @param path
+     * @param properties
+     * @return
+     */
+    public BrokerFileSystem getOSSFileSystem(String path, Map<String, String> 
properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String accessKey = properties.getOrDefault(FS_OSS_ACCESS_KEY, "");
+        String secretKey = properties.getOrDefault(FS_OSS_SECRET_KEY, "");
+        String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, "");
+        String disableCache = 
properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true");
+        String host = OSS_SCHEME + "://" + endpoint + "/" + 
pathUri.getUri().getHost();
+        String obsUgi = accessKey + "," + secretKey;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
obsUgi);
+        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
+        fileSystem.getLock().lock();
+        try {
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("create file system for new path " + path);
+                // create a new filesystem
+                Configuration conf = new Configuration();
+                conf.set(FS_OSS_ACCESS_KEY, accessKey);
+                conf.set(FS_OSS_SECRET_KEY, secretKey);
+                conf.set(FS_OSS_ENDPOINT, endpoint);
+                conf.set(FS_OSS_IMPL, 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+                conf.set(FS_OSS_IMPL_DISABLE_CACHE, disableCache);
+                FileSystem ossFileSystem = FileSystem.get(pathUri.getUri(), 
conf);
+                fileSystem.setFileSystem(ossFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
     /**
      * visible for test
      *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to