This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 25480ae9e7 [Feature][Resource Center] Add support for Huawei Cloud OBS 
(#14643)
25480ae9e7 is described below

commit 25480ae9e75d900a969bd2df67c6e136089b6691
Author: Edison Catto <[email protected]>
AuthorDate: Mon Aug 14 13:25:48 2023 +0800

    [Feature][Resource Center] Add support for Huawei Cloud OBS (#14643)
    
    * [Feature][Resource Center] Add support for Huawei Cloud OBS as storage of 
resource center
    
    * add license and add doc
    
    * add 3-party dependency license
    
    * Update LICENSE
    
    * fix
    
    * Update pom.xml
    
    * fix
    
    * fix
    
    ---------
    
    Co-authored-by: sunkang <[email protected]>
    Co-authored-by: xiangzihao <[email protected]>
    Co-authored-by: Rick Cheng <[email protected]>
---
 docs/docs/en/guide/resource/configuration.md       |  13 +-
 docs/docs/zh/guide/resource/configuration.md       |  13 +-
 dolphinscheduler-bom/pom.xml                       |  18 +
 dolphinscheduler-common/pom.xml                    |   5 +
 .../common/constants/Constants.java                |   3 +
 .../common/enums/ResUploadType.java                |   2 +-
 .../src/main/resources/common.properties           |  13 +-
 dolphinscheduler-dist/release-docs/LICENSE         |   1 +
 .../licenses/LICENSE-esdk-obs-java-bundle.txt      | 202 ++++++++
 .../dolphinscheduler-storage-all/pom.xml           |   5 +
 .../plugin/storage/api/StorageType.java            |   4 +-
 .../pom.xml                                        |  28 +-
 .../plugin/storage/obs/ObsStorageOperator.java     | 513 +++++++++++++++++++++
 .../storage/obs/ObsStorageOperatorFactory.java}    |  45 +-
 .../plugin/storage/obs/ObsStorageOperatorTest.java | 291 ++++++++++++
 dolphinscheduler-storage-plugin/pom.xml            |   1 +
 .../plugin/task/api/TaskConstants.java             |   6 +
 tools/dependencies/known-dependencies.txt          |   1 +
 18 files changed, 1106 insertions(+), 58 deletions(-)

diff --git a/docs/docs/en/guide/resource/configuration.md 
b/docs/docs/en/guide/resource/configuration.md
index d8e13770b2..a4bb8cef42 100644
--- a/docs/docs/en/guide/resource/configuration.md
+++ b/docs/docs/en/guide/resource/configuration.md
@@ -1,7 +1,7 @@
 # Resource Center Configuration
 
 - You could use `Resource Center` to upload text files, UDFs and other 
task-related files.
-- You could configure `Resource Center` to use distributed file system like 
[Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), 
[MinIO](https://github.com/minio/minio) cluster or remote storage products like 
[AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud 
OSS](https://www.aliyun.com/product/oss), etc.
+- You could configure `Resource Center` to use distributed file system like 
[Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), 
[MinIO](https://github.com/minio/minio) cluster or remote storage products like 
[AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud 
OSS](https://www.aliyun.com/product/oss), [Huawei Cloud 
OBS](https://support.huaweicloud.com/obs/index.html) etc.
 - You could configure `Resource Center` to use local file system. If you 
deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use 
local file system for `Resouce Center` without the need of an external `HDFS` 
system or `S3`.
 - Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could 
use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or 
[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount 
`OSS` to your machines and use the local file system for `Resouce Center`. In 
this way, you could operate remote files as if on your local machines.
 
@@ -80,7 +80,7 @@ data.basedir.path=/tmp/dolphinscheduler
 # resource view suffixs
 
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
 
-# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE
+# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
 resource.storage.type=NONE
 # resource store on HDFS/S3/OSS path, resource file will store to this base 
path, self configuration, please make sure the directory exists on hdfs and 
have read write permissions. "/dolphinscheduler" is recommended
 resource.storage.upload.base.path=/tmp/dolphinscheduler
@@ -107,6 +107,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
 # oss bucket endpoint, required if you set resource.storage.type=OSS
 resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
 
+# alibaba cloud access key id, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.id=<your-access-key-id>
+# alibaba cloud access key secret, required if you set 
resource.storage.type=OBS
+resource.huawei.cloud.access.key.secret=<your-access-key-secret>
+# oss bucket name, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.bucket.name=dolphinscheduler
+# oss bucket endpoint, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
+
 # if resource.storage.type=HDFS, the user must have the permission to create 
directories under the HDFS root path
 resource.hdfs.root.user=hdfs
 # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if 
resource.storage.type=HDFS and namenode HA is enabled, you need to copy 
core-site.xml and hdfs-site.xml to conf dir
diff --git a/docs/docs/zh/guide/resource/configuration.md 
b/docs/docs/zh/guide/resource/configuration.md
index 43ee13966e..25bacf90a0 100644
--- a/docs/docs/zh/guide/resource/configuration.md
+++ b/docs/docs/zh/guide/resource/configuration.md
@@ -1,7 +1,7 @@
 # 资源中心配置详情
 
 - 资源中心通常用于上传文件、UDF 函数,以及任务组管理等操作。
-- 
资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS
 S3](https://aws.amazon.com/s3/)或者[阿里云 
OSS](https://www.aliyun.com/product/oss)等。
+- 
资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS
 S3](https://aws.amazon.com/s3/)或者[阿里云 
OSS](https://www.aliyun.com/product/oss),[华为云 
OBS](https://support.huaweicloud.com/obs/index.html) 等。
 - 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。
 - 
除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。
 
@@ -79,7 +79,7 @@ resource.aws.s3.endpoint=
 # user data local directory path, please make sure the directory exists and 
have read write permissions
 data.basedir.path=/tmp/dolphinscheduler
 
-# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS
+# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
 resource.storage.type=LOCAL
 
 # resource store on HDFS/S3/OSS path, resource file will store to this hadoop 
hdfs path, self configuration,
@@ -108,6 +108,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
 # oss bucket endpoint, required if you set resource.storage.type=OSS
 resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
 
+# alibaba cloud access key id, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.id=<your-access-key-id>
+# alibaba cloud access key secret, required if you set 
resource.storage.type=OBS
+resource.huawei.cloud.access.key.secret=<your-access-key-secret>
+# oss bucket name, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.bucket.name=dolphinscheduler
+# oss bucket endpoint, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
+
 # if resource.storage.type=HDFS, the user must have the permission to create 
directories under the HDFS root path
 resource.hdfs.root.user=root
 # if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index 7af68b6701..8bb9481fc8 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -115,6 +115,7 @@
         <casdoor.version>1.6.0</casdoor.version>
         <azure-sdk-bom.version>1.2.10</azure-sdk-bom.version>
         <protobuf.version>3.17.2</protobuf.version>
+        <esdk-obs.version>3.23.3</esdk-obs.version>
         <system-lambda.version>1.2.1</system-lambda.version>
     </properties>
 
@@ -896,6 +897,23 @@
                 <artifactId>protobuf-java</artifactId>
                 <version>${protobuf.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>com.huaweicloud</groupId>
+                <artifactId>esdk-obs-java-bundle</artifactId>
+                <version>${esdk-obs.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>com.github.stefanbirkner</groupId>
                 <artifactId>system-lambda</artifactId>
diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml
index 096306fa8c..0254e6c43a 100644
--- a/dolphinscheduler-common/pom.xml
+++ b/dolphinscheduler-common/pom.xml
@@ -93,6 +93,11 @@
             <artifactId>aws-java-sdk-s3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.huaweicloud</groupId>
+            <artifactId>esdk-obs-java-bundle</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.github.oshi</groupId>
             <artifactId>oshi-core</artifactId>
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index f2260c30cf..e98f5f202f 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -147,6 +147,9 @@ public final class Constants {
 
     public static final String AZURE_BLOB_STORAGE_ACCOUNT_NAME = 
"resource.azure.blob.storage.account.name";
 
+    public static final String HUAWEI_CLOUD_OBS_BUCKET_NAME = 
"resource.huawei.cloud.obs.bucket.name";
+    public static final String HUAWEI_CLOUD_OBS_END_POINT = 
"resource.huawei.cloud.obs.endpoint";
+
     /**
      * fetch applicationId way
      */
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
index 2ebc2e1a5c..143cb12a35 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
@@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums;
  * data base types
  */
 public enum ResUploadType {
-    LOCAL, HDFS, S3, OSS, GCS, ABS, NONE
+    LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
 }
diff --git a/dolphinscheduler-common/src/main/resources/common.properties 
b/dolphinscheduler-common/src/main/resources/common.properties
index 43a1338152..ea4c126beb 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler
 # resource view suffixs
 
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
 
-# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a 
specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
+# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE. LOCAL type 
is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" 
configuration
 # please notice that LOCAL mode does not support reading and writing in 
distributed mode, which mean you can only use your resource in one machine, 
unless
 # use shared file mount point
 resource.storage.type=LOCAL
@@ -73,6 +73,17 @@ resource.azure.blob.storage.account.name=<your-account-name>
 # abs connection string, required if you set resource.storage.type=ABS
 resource.azure.blob.storage.connection.string=<your-connection-string>
 
+
+# huawei cloud access key id, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.id=<your-access-key-id>
+# huawei cloud access key secret, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.secret=<your-access-key-secret>
+# oss bucket name, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.bucket.name=dolphinscheduler
+# oss bucket endpoint, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
+
+
 # if resource.storage.type=HDFS, the user must have the permission to create 
directories under the HDFS root path
 resource.hdfs.root.user=hdfs
 # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if 
resource.storage.type=HDFS and namenode HA is enabled, you need to copy 
core-site.xml and hdfs-site.xml to conf dir
diff --git a/dolphinscheduler-dist/release-docs/LICENSE 
b/dolphinscheduler-dist/release-docs/LICENSE
index ffc1a39ec6..ab26aeda4a 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -567,6 +567,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
     casdoor-spring-boot-starter 1.6.0  
https://mvnrepository.com/artifact/org.casbin/casdoor-spring-boot-starter/1.6.0 
 Apache 2.0
     org.apache.oltu.oauth2.client 1.0.2  
https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.client/1.0.2
  Apache 2.0
     org.apache.oltu.oauth2.common 1.0.2  
https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.common/1.0.2
  Apache 2.0
+    esdk-obs-java-bundle 3.23.3 
https://mvnrepository.com/artifact/com.huaweicloud/esdk-obs-java-bundle/3.23.3  
Apache 2.0
 
 
 
diff --git 
a/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt 
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt
new file mode 100644
index 0000000000..d645695673
--- /dev/null
+++ 
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
index a85fcd3c4e..4d6edcd1cc 100644
--- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
@@ -57,6 +57,11 @@
             <artifactId>dolphinscheduler-storage-abs</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-obs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
index 33c34f4c53..7ead2c8a94 100644
--- 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
+++ 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
@@ -27,7 +27,9 @@ public enum StorageType {
     S3(3, "S3"),
     GCS(4, "GCS"),
 
-    ABS(5, "ABS");
+    ABS(5, "ABS"),
+
+    OBS(6, "OBS");
 
     private final int code;
     private final String name;
diff --git 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
similarity index 64%
copy from dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
copy to dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
index a85fcd3c4e..d19923498b 100644
--- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
@@ -24,39 +24,23 @@
         <version>dev-SNAPSHOT</version>
     </parent>
 
-    <artifactId>dolphinscheduler-storage-all</artifactId>
+    <artifactId>dolphinscheduler-storage-obs</artifactId>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-storage-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-storage-s3</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-storage-hdfs</artifactId>
-            <version>${project.version}</version>
+            <groupId>com.huaweicloud</groupId>
+            <artifactId>esdk-obs-java-bundle</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-storage-oss</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-storage-gcs</artifactId>
+            <artifactId>dolphinscheduler-storage-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-storage-abs</artifactId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
-
 </project>
diff --git 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
new file mode 100644
index 0000000000..6dc5318519
--- /dev/null
+++ 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.storage.obs;
+
+import static 
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ResUploadType;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import com.obs.services.ObsClient;
+import com.obs.services.exception.ObsException;
+import com.obs.services.internal.ServiceException;
+import com.obs.services.model.DeleteObjectsRequest;
+import com.obs.services.model.GetObjectRequest;
+import com.obs.services.model.ListObjectsRequest;
+import com.obs.services.model.ObjectListing;
+import com.obs.services.model.ObjectMetadata;
+import com.obs.services.model.ObsObject;
+import com.obs.services.model.PutObjectRequest;
+
+@Data
+@Slf4j
+public class ObsStorageOperator implements Closeable, StorageOperate {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String bucketName;
+
+    private String endPoint;
+
+    private ObsClient obsClient;
+
+    public ObsStorageOperator() {
+    }
+
+    public void init() {
+        this.accessKeyId = readObsAccessKeyID();
+        this.accessKeySecret = readObsAccessKeySecret();
+        this.endPoint = readObsEndPoint();
+        this.bucketName = readObsBucketName();
+        this.obsClient = buildObsClient();
+        ensureBucketSuccessfullyCreated(bucketName);
+    }
+
+    protected String readObsAccessKeyID() {
+        return 
PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_ID);
+    }
+
+    protected String readObsAccessKeySecret() {
+        return 
PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_SECRET);
+    }
+
+    protected String readObsBucketName() {
+        return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_BUCKET_NAME);
+    }
+
+    protected String readObsEndPoint() {
+        return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_END_POINT);
+    }
+
+    @Override
+    public void close() throws IOException {
+        obsClient.close();
+    }
+
+    @Override
+    public void createTenantDirIfNotExists(String tenantCode) throws Exception 
{
+        mkdir(tenantCode, getObsResDir(tenantCode));
+        mkdir(tenantCode, getObsUdfDir(tenantCode));
+    }
+
+    @Override
+    public String getResDir(String tenantCode) {
+        return getObsResDir(tenantCode) + FOLDER_SEPARATOR;
+    }
+
+    @Override
+    public String getUdfDir(String tenantCode) {
+        return getObsUdfDir(tenantCode) + FOLDER_SEPARATOR;
+    }
+
+    @Override
+    public boolean mkdir(String tenantCode, String path) throws IOException {
+        final String key = path + FOLDER_SEPARATOR;
+        if (!obsClient.doesObjectExist(bucketName, key)) {
+            createObsPrefix(bucketName, key);
+        }
+        return true;
+    }
+
+    protected void createObsPrefix(final String bucketName, final String key) {
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setContentLength(0L);
+        InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
+        PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, 
key, emptyContent);
+        obsClient.putObject(putObjectRequest);
+    }
+
+    @Override
+    public String getResourceFullName(String tenantCode, String fileName) {
+        if (fileName.startsWith(FOLDER_SEPARATOR)) {
+            fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
+        }
+        return String.format(FORMAT_S_S, getObsResDir(tenantCode), fileName);
+    }
+
+    @Override
+    public String getResourceFileName(String tenantCode, String fullName) {
+        String resDir = getResDir(tenantCode);
+        return fullName.replaceFirst(resDir, "");
+    }
+
+    @Override
+    public String getFileName(ResourceType resourceType, String tenantCode, 
String fileName) {
+        if (fileName.startsWith(FOLDER_SEPARATOR)) {
+            fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
+        }
+        return getDir(resourceType, tenantCode) + fileName;
+    }
+
+    @Override
+    public boolean delete(String fullName, List<String> childrenPathList, 
boolean recursive) throws IOException {
+        // append the resource fullName to the list for deletion.
+        childrenPathList.add(fullName);
+
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(bucketName);
+        for (String deleteKys : childrenPathList) {
+            deleteObjectsRequest.addKeyAndVersion(deleteKys);
+        }
+
+        try {
+            obsClient.deleteObjects(deleteObjectsRequest);
+        } catch (Exception e) {
+            log.error("delete objects error", e);
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void download(String tenantCode, String srcFilePath, String 
dstFilePath,
+                         boolean overwrite) throws IOException {
+        File dstFile = new File(dstFilePath);
+        if (dstFile.isDirectory()) {
+            Files.delete(dstFile.toPath());
+        } else {
+            Files.createDirectories(dstFile.getParentFile().toPath());
+        }
+        ObsObject obsObject = obsClient.getObject(bucketName, srcFilePath);
+        try (
+                InputStream obsInputStream = obsObject.getObjectContent();
+                FileOutputStream fos = new FileOutputStream(dstFilePath)) {
+            byte[] readBuf = new byte[1024];
+            int readLen;
+            while ((readLen = obsInputStream.read(readBuf)) > 0) {
+                fos.write(readBuf, 0, readLen);
+            }
+        } catch (ObsException e) {
+            throw new IOException(e);
+        } catch (FileNotFoundException e) {
+            log.error("cannot find the destination file {}", dstFilePath);
+            throw e;
+        }
+    }
+
+    @Override
+    public boolean exists(String fileName) throws IOException {
+        return obsClient.doesObjectExist(bucketName, fileName);
+    }
+
+    @Override
+    public boolean delete(String filePath, boolean recursive) throws 
IOException {
+        try {
+            obsClient.deleteObject(bucketName, filePath);
+            return true;
+        } catch (ObsException e) {
+            log.error("fail to delete the object, the resource path is {}", 
filePath, e);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean copy(String srcPath, String dstPath, boolean deleteSource, 
boolean overwrite) throws IOException {
+        obsClient.copyObject(bucketName, srcPath, bucketName, dstPath);
+        if (deleteSource) {
+            obsClient.deleteObject(bucketName, srcPath);
+        }
+        return true;
+    }
+
+    @Override
+    public String getDir(ResourceType resourceType, String tenantCode) {
+        switch (resourceType) {
+            case UDF:
+                return getUdfDir(tenantCode);
+            case FILE:
+                return getResDir(tenantCode);
+            case ALL:
+                return getObsDataBasePath();
+            default:
+                return "";
+        }
+    }
+
+    @Override
+    public boolean upload(String tenantCode, String srcFile, String dstPath, 
boolean deleteSource,
+                          boolean overwrite) throws IOException {
+        try {
+            obsClient.putObject(bucketName, dstPath, new File(srcFile));
+            if (deleteSource) {
+                Files.delete(Paths.get(srcFile));
+            }
+            return true;
+        } catch (ObsException e) {
+            log.error("upload failed, the bucketName is {}, the filePath is 
{}", bucketName, dstPath, e);
+            return false;
+        }
+    }
+
+    @Override
+    public List<String> vimFile(String tenantCode, String filePath, int 
skipLineNums, int limit) throws IOException {
+        if (StringUtils.isBlank(filePath)) {
+            log.error("file path:{} is empty", filePath);
+            return Collections.emptyList();
+        }
+        ObsObject obsObject = obsClient.getObject(bucketName, filePath);
+        try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(obsObject.getObjectContent()))) {
+            Stream<String> stream = 
bufferedReader.lines().skip(skipLineNums).limit(limit);
+            return stream.collect(Collectors.toList());
+        }
+    }
+
+    @Override
+    public ResUploadType returnStorageType() {
+        return ResUploadType.OBS;
+    }
+
+    @Override
+    public List<StorageEntity> listFilesStatusRecursively(String path, String 
defaultPath, String tenantCode,
+                                                          ResourceType type) {
+        List<StorageEntity> storageEntityList = new ArrayList<>();
+        LinkedList<StorageEntity> foldersToFetch = new LinkedList<>();
+
+        StorageEntity initialEntity = null;
+        try {
+            initialEntity = getFileStatus(path, defaultPath, tenantCode, type);
+        } catch (Exception e) {
+            log.error("error while listing files status recursively, path: 
{}", path, e);
+            return storageEntityList;
+        }
+        foldersToFetch.add(initialEntity);
+
+        while (!foldersToFetch.isEmpty()) {
+            String pathToExplore = foldersToFetch.pop().getFullName();
+            try {
+                List<StorageEntity> tempList = listFilesStatus(pathToExplore, 
defaultPath, tenantCode, type);
+                for (StorageEntity temp : tempList) {
+                    if (temp.isDirectory()) {
+                        foldersToFetch.add(temp);
+                    }
+                }
+                storageEntityList.addAll(tempList);
+            } catch (Exception e) {
+                log.error("error while listing files stat:wus recursively, 
path: {}", pathToExplore, e);
+            }
+        }
+
+        return storageEntityList;
+    }
+
+    @Override
+    public List<StorageEntity> listFilesStatus(String path, String 
defaultPath, String tenantCode,
+                                               ResourceType type) throws 
Exception {
+        List<StorageEntity> storageEntityList = new ArrayList<>();
+
+        ListObjectsRequest request = new ListObjectsRequest();
+        request.setBucketName(bucketName);
+        request.setPrefix(path);
+        request.setDelimiter(FOLDER_SEPARATOR);
+        ObjectListing result = null;
+        try {
+            result = obsClient.listObjects(request);
+        } catch (Exception e) {
+            throw new ServiceException("Get ObsClient file list exception", e);
+        }
+
+        while (result != null) {
+            String nextMarker = result.getNextMarker();
+            List<ObsObject> objects = result.getObjects();
+
+            for (ObsObject object : objects) {
+                if (!object.getObjectKey().endsWith(FOLDER_SEPARATOR)) {
+                    // the path is a file
+                    String[] aliasArr = 
object.getObjectKey().split(FOLDER_SEPARATOR);
+                    String alias = aliasArr[aliasArr.length - 1];
+                    String fileName = StringUtils.difference(defaultPath, 
object.getObjectKey());
+
+                    StorageEntity entity = new StorageEntity();
+                    ObjectMetadata metadata = object.getMetadata();
+                    entity.setAlias(alias);
+                    entity.setFileName(fileName);
+                    entity.setFullName(object.getObjectKey());
+                    entity.setDirectory(false);
+                    entity.setUserName(tenantCode);
+                    entity.setType(type);
+                    entity.setSize(metadata.getContentLength());
+                    entity.setCreateTime(metadata.getLastModified());
+                    entity.setUpdateTime(metadata.getLastModified());
+                    entity.setPfullName(path);
+
+                    storageEntityList.add(entity);
+                }
+            }
+
+            for (String commonPrefix : result.getCommonPrefixes()) {
+                // the paths in commonPrefix are directories
+                String suffix = StringUtils.difference(path, commonPrefix);
+                String fileName = StringUtils.difference(defaultPath, 
commonPrefix);
+
+                StorageEntity entity = new StorageEntity();
+                entity.setAlias(suffix);
+                entity.setFileName(fileName);
+                entity.setFullName(commonPrefix);
+                entity.setDirectory(true);
+                entity.setUserName(tenantCode);
+                entity.setType(type);
+                entity.setSize(0);
+                entity.setCreateTime(null);
+                entity.setUpdateTime(null);
+                entity.setPfullName(path);
+
+                storageEntityList.add(entity);
+            }
+
+            if (!StringUtils.isNotBlank(nextMarker)) {
+                break;
+            }
+
+            request.setMarker(nextMarker);
+            try {
+                result = obsClient.listObjects(request);
+            } catch (Exception e) {
+                throw new ServiceException("Get ObsClient file list 
exception", e);
+            }
+        }
+        return storageEntityList;
+    }
+
+    @Override
+    public StorageEntity getFileStatus(String path, String defaultPath, String 
tenantCode,
+                                       ResourceType type) throws Exception {
+
+        if (path.endsWith(FOLDER_SEPARATOR)) {
+            // the path is a directory that may or may not exist in ObsClient
+            String alias = findDirAlias(path);
+            String fileName = StringUtils.difference(defaultPath, path);
+
+            StorageEntity entity = new StorageEntity();
+            entity.setAlias(alias);
+            entity.setFileName(fileName);
+            entity.setFullName(path);
+            entity.setDirectory(true);
+            entity.setUserName(tenantCode);
+            entity.setType(type);
+            entity.setSize(0);
+
+            return entity;
+
+        } else {
+            GetObjectRequest request = new GetObjectRequest();
+            request.setBucketName(bucketName);
+            request.setObjectKey(path);
+            ObsObject object;
+            try {
+                object = obsClient.getObject(request);
+            } catch (Exception e) {
+                throw new ServiceException("Get ObsClient file list 
exception", e);
+            }
+
+            String[] aliasArr = object.getObjectKey().split(FOLDER_SEPARATOR);
+            String alias = aliasArr[aliasArr.length - 1];
+            String fileName = StringUtils.difference(defaultPath, 
object.getObjectKey());
+
+            StorageEntity entity = new StorageEntity();
+            ObjectMetadata metadata = object.getMetadata();
+            entity.setAlias(alias);
+            entity.setFileName(fileName);
+            entity.setFullName(object.getObjectKey());
+            entity.setDirectory(false);
+            entity.setUserName(tenantCode);
+            entity.setType(type);
+            entity.setSize(metadata.getContentLength());
+            entity.setCreateTime(metadata.getLastModified());
+            entity.setUpdateTime(metadata.getLastModified());
+
+            return entity;
+        }
+
+    }
+
+    @Override
+    public void deleteTenant(String tenantCode) throws Exception {
+        deleteTenantCode(tenantCode);
+    }
+
+    public String getObsResDir(String tenantCode) {
+        return String.format("%s/" + RESOURCE_TYPE_FILE, 
getObsTenantDir(tenantCode));
+    }
+
+    public String getObsUdfDir(String tenantCode) {
+        return String.format("%s/" + RESOURCE_TYPE_UDF, 
getObsTenantDir(tenantCode));
+    }
+
+    public String getObsTenantDir(String tenantCode) {
+        return String.format(FORMAT_S_S, getObsDataBasePath(), tenantCode);
+    }
+
+    public String getObsDataBasePath() {
+        if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
+            return "";
+        } else {
+            return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, "");
+        }
+    }
+
+    protected void deleteTenantCode(String tenantCode) {
+        deleteDir(getResDir(tenantCode));
+        deleteDir(getUdfDir(tenantCode));
+    }
+
+    public void ensureBucketSuccessfullyCreated(String bucketName) {
+        if (StringUtils.isBlank(bucketName)) {
+            throw new 
IllegalArgumentException("resource.alibaba.cloud.obs.bucket.name is empty");
+        }
+
+        boolean existsBucket = obsClient.headBucket(bucketName);
+        if (!existsBucket) {
+            throw new IllegalArgumentException(
+                    "bucketName: " + bucketName + " is not exists, you need to 
create them by yourself");
+        }
+
+        log.info("bucketName: {} has been found", bucketName);
+    }
+
+    protected void deleteDir(String directoryName) {
+        if (obsClient.doesObjectExist(bucketName, directoryName)) {
+            obsClient.deleteObject(bucketName, directoryName);
+        }
+    }
+
+    protected ObsClient buildObsClient() {
+        return new ObsClient(accessKeyId, accessKeySecret, endPoint);
+    }
+
+    private String findDirAlias(String dirPath) {
+        if (!dirPath.endsWith(FOLDER_SEPARATOR)) {
+            return dirPath;
+        }
+
+        Path path = Paths.get(dirPath);
+        return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR;
+    }
+}
diff --git 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
similarity index 51%
copy from 
dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
copy to 
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
index 33c34f4c53..2e67103931 100644
--- 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
+++ 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
@@ -15,42 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.storage.api;
+package org.apache.dolphinscheduler.plugin.storage.obs;
 
-import java.util.Optional;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageType;
 
-public enum StorageType {
+import com.google.auto.service.AutoService;
 
-    LOCAL(0, "LOCAL"),
-    HDFS(1, "HDFS"),
-    OSS(2, "OSS"),
-    S3(3, "S3"),
-    GCS(4, "GCS"),
+@AutoService({StorageOperateFactory.class})
+public class ObsStorageOperatorFactory implements StorageOperateFactory {
 
-    ABS(5, "ABS");
-
-    private final int code;
-    private final String name;
-
-    StorageType(int code, String name) {
-        this.code = code;
-        this.name = name;
-    }
-
-    public int getCode() {
-        return code;
+    public ObsStorageOperatorFactory() {
     }
 
-    public String getName() {
-        return name;
+    @Override
+    public StorageOperate createStorageOperate() {
+        ObsStorageOperator ossOperator = new ObsStorageOperator();
+        ossOperator.init();
+        return ossOperator;
     }
 
-    public static Optional<StorageType> getStorageType(String name) {
-        for (StorageType storageType : StorageType.values()) {
-            if (storageType.getName().equals(name)) {
-                return Optional.of(storageType);
-            }
-        }
-        return Optional.empty();
+    @Override
+    public StorageType getStorageOperate() {
+        return StorageType.OBS;
     }
 }
diff --git 
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
new file mode 100644
index 0000000000..4aabdb6bd8
--- /dev/null
+++ 
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.storage.obs;
+
+import static 
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
+import static 
org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.obs.services.ObsClient;
+
+@ExtendWith(MockitoExtension.class)
+public class ObsStorageOperatorTest {
+
+    private static final String ACCESS_KEY_ID_MOCK = "ACCESS_KEY_ID_MOCK";
+    private static final String ACCESS_KEY_SECRET_MOCK = 
"ACCESS_KEY_SECRET_MOCK";
+    private static final String END_POINT_MOCK = "END_POINT_MOCK";
+    private static final String BUCKET_NAME_MOCK = "BUCKET_NAME_MOCK";
+    private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK";
+    private static final String DIR_MOCK = "DIR_MOCK";
+    private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK";
+    private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK";
+    private static final String FULL_NAME = "/tmp/dir1/";
+
+    private static final String DEFAULT_PATH = "/tmp/";
+    @Mock
+    private ObsClient obsClientMock;
+
+    private ObsStorageOperator obsOperator;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        obsOperator = spy(new ObsStorageOperator());
+        doReturn(ACCESS_KEY_ID_MOCK).when(obsOperator)
+                .readObsAccessKeyID();
+        doReturn(ACCESS_KEY_SECRET_MOCK).when(obsOperator)
+                .readObsAccessKeySecret();
+        doReturn(BUCKET_NAME_MOCK).when(obsOperator).readObsBucketName();
+        doReturn(END_POINT_MOCK).when(obsOperator).readObsEndPoint();
+        doReturn(obsClientMock).when(obsOperator).buildObsClient();
+        doNothing().when(obsOperator).ensureBucketSuccessfullyCreated(any());
+
+        obsOperator.init();
+
+    }
+
+    @Test
+    public void initObsOperator() {
+        verify(obsOperator, times(1)).buildObsClient();
+        Assertions.assertEquals(ACCESS_KEY_ID_MOCK, 
obsOperator.getAccessKeyId());
+        Assertions.assertEquals(ACCESS_KEY_SECRET_MOCK, 
obsOperator.getAccessKeySecret());
+        Assertions.assertEquals(BUCKET_NAME_MOCK, obsOperator.getBucketName());
+    }
+
+    @Test
+    public void tearDownObsOperator() throws IOException {
+        doNothing().when(obsClientMock).close();
+        obsOperator.close();
+        verify(obsClientMock, times(1)).close();
+    }
+
+    @Test
+    public void createTenantResAndUdfDir() throws Exception {
+        doReturn(DIR_MOCK).when(obsOperator).getObsResDir(TENANT_CODE_MOCK);
+        doReturn(DIR_MOCK).when(obsOperator).getObsUdfDir(TENANT_CODE_MOCK);
+        doReturn(true).when(obsOperator).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+        obsOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK);
+        verify(obsOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+    }
+
+    @Test
+    public void getResDir() {
+        final String expectedResourceDir = 
String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK);
+        final String dir = obsOperator.getResDir(TENANT_CODE_MOCK);
+        Assertions.assertEquals(expectedResourceDir, dir);
+    }
+
+    @Test
+    public void getUdfDir() {
+        final String expectedUdfDir = 
String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK);
+        final String dir = obsOperator.getUdfDir(TENANT_CODE_MOCK);
+        Assertions.assertEquals(expectedUdfDir, dir);
+    }
+
+    @Test
+    public void mkdirWhenDirExists() {
+        boolean isSuccess = false;
+        try {
+            final String key = DIR_MOCK + FOLDER_SEPARATOR;
+            
doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key);
+            isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+            verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, 
key);
+
+        } catch (IOException e) {
+            Assertions.fail("test failed due to unexpected IO exception");
+        }
+
+        Assertions.assertTrue(isSuccess);
+    }
+
+    @Test
+    public void mkdirWhenDirNotExists() {
+        boolean isSuccess = true;
+        try {
+            final String key = DIR_MOCK + FOLDER_SEPARATOR;
+            
doReturn(false).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key);
+            doNothing().when(obsOperator).createObsPrefix(BUCKET_NAME_MOCK, 
key);
+            isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+            verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, 
key);
+            verify(obsOperator, times(1)).createObsPrefix(BUCKET_NAME_MOCK, 
key);
+
+        } catch (IOException e) {
+            Assertions.fail("test failed due to unexpected IO exception");
+        }
+
+        Assertions.assertTrue(isSuccess);
+    }
+
+    @Test
+    public void getResourceFullName() {
+        final String expectedResourceFullName =
+                String.format("dolphinscheduler/%s/resources/%s", 
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+        final String resourceFullName = 
obsOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
+        Assertions.assertEquals(expectedResourceFullName, resourceFullName);
+    }
+
+    @Test
+    public void getResourceFileName() {
+        final String expectedResourceFileName = FILE_NAME_MOCK;
+        final String resourceFullName =
+                String.format("dolphinscheduler/%s/resources/%s", 
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+        final String resourceFileName = 
obsOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName);
+        Assertions.assertEquals(expectedResourceFileName, resourceFileName);
+    }
+
+    @Test
+    public void getFileName() {
+        final String expectedFileName =
+                String.format("dolphinscheduler/%s/resources/%s", 
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+        final String fileName = obsOperator.getFileName(ResourceType.FILE, 
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+        Assertions.assertEquals(expectedFileName, fileName);
+    }
+
+    @Test
+    public void exists() {
+        boolean doesExist = false;
+        doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, 
FILE_NAME_MOCK);
+        try {
+            doesExist = obsOperator.exists(FILE_NAME_MOCK);
+        } catch (IOException e) {
+            Assertions.fail("unexpected IO exception in unit test");
+        }
+
+        Assertions.assertTrue(doesExist);
+        verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, 
FILE_NAME_MOCK);
+    }
+
+    @Test
+    public void delete() {
+        boolean isDeleted = false;
+        doReturn(null).when(obsClientMock).deleteObject(anyString(), 
anyString());
+        try {
+            isDeleted = obsOperator.delete(FILE_NAME_MOCK, true);
+        } catch (IOException e) {
+            Assertions.fail("unexpected IO exception in unit test");
+        }
+
+        Assertions.assertTrue(isDeleted);
+        verify(obsClientMock, times(1)).deleteObject(anyString(), anyString());
+    }
+
+    @Test
+    public void copy() {
+        boolean isSuccess = false;
+        doReturn(null).when(obsClientMock).copyObject(anyString(), 
anyString(), anyString(), anyString());
+        try {
+            isSuccess = obsOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, 
false, false);
+        } catch (IOException e) {
+            Assertions.fail("unexpected IO exception in unit test");
+        }
+
+        Assertions.assertTrue(isSuccess);
+        verify(obsClientMock, times(1)).copyObject(anyString(), anyString(), 
anyString(), anyString());
+    }
+
+    @Test
+    public void deleteTenant() {
+        doNothing().when(obsOperator).deleteTenantCode(anyString());
+        try {
+            obsOperator.deleteTenant(TENANT_CODE_MOCK);
+        } catch (Exception e) {
+            Assertions.fail("unexpected exception caught in unit test");
+        }
+
+        verify(obsOperator, times(1)).deleteTenantCode(anyString());
+    }
+
+    @Test
+    public void getObsResDir() {
+        final String expectedObsResDir = 
String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK);
+        final String obsResDir = obsOperator.getObsResDir(TENANT_CODE_MOCK);
+        Assertions.assertEquals(expectedObsResDir, obsResDir);
+    }
+
+    @Test
+    public void getObsUdfDir() {
+        final String expectedObsUdfDir = 
String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK);
+        final String obsUdfDir = obsOperator.getObsUdfDir(TENANT_CODE_MOCK);
+        Assertions.assertEquals(expectedObsUdfDir, obsUdfDir);
+    }
+
+    @Test
+    public void getObsTenantDir() {
+        final String expectedObsTenantDir = String.format(FORMAT_S_S, 
DIR_MOCK, TENANT_CODE_MOCK);
+        doReturn(DIR_MOCK).when(obsOperator).getObsDataBasePath();
+        final String obsTenantDir = 
obsOperator.getObsTenantDir(TENANT_CODE_MOCK);
+        Assertions.assertEquals(expectedObsTenantDir, obsTenantDir);
+    }
+
+    @Test
+    public void deleteDir() {
+        doReturn(true).when(obsClientMock).doesObjectExist(anyString(), 
anyString());
+        obsOperator.deleteDir(DIR_MOCK);
+        verify(obsClientMock, times(1)).deleteObject(anyString(), anyString());
+    }
+
+    @Test
+    public void testGetFileStatus() throws Exception {
+        StorageEntity entity = obsOperator.getFileStatus(FULL_NAME, 
DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
+        Assertions.assertEquals(FULL_NAME, entity.getFullName());
+        Assertions.assertEquals("dir1/", entity.getFileName());
+    }
+
+    @Test
+    public void testListFilesStatus() throws Exception {
+        List<StorageEntity> result =
+                
obsOperator.listFilesStatus("dolphinscheduler/default/resources/",
+                        "dolphinscheduler/default/resources/",
+                        "default", ResourceType.FILE);
+        Assertions.assertEquals(0, result.size());
+    }
+
+    @Test
+    public void testListFilesStatusRecursively() throws Exception {
+        StorageEntity entity = new StorageEntity();
+        entity.setFullName(FULL_NAME);
+
+        doReturn(entity).when(obsOperator).getFileStatus(FULL_NAME, 
DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
+        
doReturn(Collections.EMPTY_LIST).when(obsOperator).listFilesStatus(anyString(), 
anyString(), anyString(),
+                Mockito.any(ResourceType.class));
+
+        List<StorageEntity> result =
+                obsOperator.listFilesStatusRecursively(FULL_NAME, 
DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
+        Assertions.assertEquals(0, result.size());
+    }
+}
diff --git a/dolphinscheduler-storage-plugin/pom.xml 
b/dolphinscheduler-storage-plugin/pom.xml
index 7e27358a64..a6a86bc2a6 100644
--- a/dolphinscheduler-storage-plugin/pom.xml
+++ b/dolphinscheduler-storage-plugin/pom.xml
@@ -35,6 +35,7 @@
         <module>dolphinscheduler-storage-oss</module>
         <module>dolphinscheduler-storage-gcs</module>
         <module>dolphinscheduler-storage-abs</module>
+        <module>dolphinscheduler-storage-obs</module>
     </modules>
 
     <dependencyManagement>
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 2d314382c5..ab46bb58d5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -475,6 +475,12 @@ public class TaskConstants {
     public static final String ALIBABA_CLOUD_ACCESS_KEY_SECRET = 
"resource.alibaba.cloud.access.key.secret";
     public static final String ALIBABA_CLOUD_REGION = 
"resource.alibaba.cloud.region";
 
+    /**
+     * huawei cloud config
+     */
+    public static final String HUAWEI_CLOUD_ACCESS_KEY_ID = 
"resource.huawei.cloud.access.key.id";
+    public static final String HUAWEI_CLOUD_ACCESS_KEY_SECRET = 
"resource.huawei.cloud.access.key.secret";
+
     /**
      * use for k8s task
      */
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 28d2f1fe70..23fb7919d9 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -495,3 +495,4 @@ snowflake-jdbc-3.13.29.jar
 azure-storage-blob-12.21.0.jar
 azure-storage-internal-avro-12.6.0.jar
 vertica-jdbc-12.0.4-0.jar
+esdk-obs-java-bundle-3.23.3.jar
\ No newline at end of file

Reply via email to