This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 25480ae9e7 [Feature][Resource Center] Add support for Huawei Cloud OBS
(#14643)
25480ae9e7 is described below
commit 25480ae9e75d900a969bd2df67c6e136089b6691
Author: Edison Catto <[email protected]>
AuthorDate: Mon Aug 14 13:25:48 2023 +0800
[Feature][Resource Center] Add support for Huawei Cloud OBS (#14643)
* [Feature][Resource Center] Add support for Huawei Cloud OBS as storage of
resource center
* add license and add doc
* add 3-party dependency license
* Update LICENSE
* fix
* Update pom.xml
* fix
* fix
---------
Co-authored-by: sunkang <[email protected]>
Co-authored-by: xiangzihao <[email protected]>
Co-authored-by: Rick Cheng <[email protected]>
---
docs/docs/en/guide/resource/configuration.md | 13 +-
docs/docs/zh/guide/resource/configuration.md | 13 +-
dolphinscheduler-bom/pom.xml | 18 +
dolphinscheduler-common/pom.xml | 5 +
.../common/constants/Constants.java | 3 +
.../common/enums/ResUploadType.java | 2 +-
.../src/main/resources/common.properties | 13 +-
dolphinscheduler-dist/release-docs/LICENSE | 1 +
.../licenses/LICENSE-esdk-obs-java-bundle.txt | 202 ++++++++
.../dolphinscheduler-storage-all/pom.xml | 5 +
.../plugin/storage/api/StorageType.java | 4 +-
.../pom.xml | 28 +-
.../plugin/storage/obs/ObsStorageOperator.java | 513 +++++++++++++++++++++
.../storage/obs/ObsStorageOperatorFactory.java} | 45 +-
.../plugin/storage/obs/ObsStorageOperatorTest.java | 291 ++++++++++++
dolphinscheduler-storage-plugin/pom.xml | 1 +
.../plugin/task/api/TaskConstants.java | 6 +
tools/dependencies/known-dependencies.txt | 1 +
18 files changed, 1106 insertions(+), 58 deletions(-)
diff --git a/docs/docs/en/guide/resource/configuration.md
b/docs/docs/en/guide/resource/configuration.md
index d8e13770b2..a4bb8cef42 100644
--- a/docs/docs/en/guide/resource/configuration.md
+++ b/docs/docs/en/guide/resource/configuration.md
@@ -1,7 +1,7 @@
# Resource Center Configuration
- You could use `Resource Center` to upload text files, UDFs and other
task-related files.
-- You could configure `Resource Center` to use distributed file system like
[Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+),
[MinIO](https://github.com/minio/minio) cluster or remote storage products like
[AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud
OSS](https://www.aliyun.com/product/oss), etc.
+- You could configure `Resource Center` to use distributed file system like
[Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+),
[MinIO](https://github.com/minio/minio) cluster or remote storage products like
[AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud
OSS](https://www.aliyun.com/product/oss), [Huawei Cloud
OBS](https://support.huaweicloud.com/obs/index.html) etc.
- You could configure `Resource Center` to use local file system. If you
deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use
local file system for `Resouce Center` without the need of an external `HDFS`
system or `S3`.
- Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could
use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or
[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount
`OSS` to your machines and use the local file system for `Resouce Center`. In
this way, you could operate remote files as if on your local machines.
@@ -80,7 +80,7 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
-# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE
+# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
resource.storage.type=NONE
# resource store on HDFS/S3/OSS path, resource file will store to this base
path, self configuration, please make sure the directory exists on hdfs and
have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/tmp/dolphinscheduler
@@ -107,6 +107,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
+# alibaba cloud access key id, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.id=<your-access-key-id>
+# alibaba cloud access key secret, required if you set
resource.storage.type=OBS
+resource.huawei.cloud.access.key.secret=<your-access-key-secret>
+# oss bucket name, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.bucket.name=dolphinscheduler
+# oss bucket endpoint, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
+
# if resource.storage.type=HDFS, the user must have the permission to create
directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if
resource.storage.type=HDFS and namenode HA is enabled, you need to copy
core-site.xml and hdfs-site.xml to conf dir
diff --git a/docs/docs/zh/guide/resource/configuration.md
b/docs/docs/zh/guide/resource/configuration.md
index 43ee13966e..25bacf90a0 100644
--- a/docs/docs/zh/guide/resource/configuration.md
+++ b/docs/docs/zh/guide/resource/configuration.md
@@ -1,7 +1,7 @@
# 资源中心配置详情
- 资源中心通常用于上传文件、UDF 函数,以及任务组管理等操作。
--
资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS
S3](https://aws.amazon.com/s3/)或者[阿里云
OSS](https://www.aliyun.com/product/oss)等。
+-
资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS
S3](https://aws.amazon.com/s3/)或者[阿里云
OSS](https://www.aliyun.com/product/oss),[华为云
OBS](https://support.huaweicloud.com/obs/index.html) 等。
- 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。
-
除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。
@@ -79,7 +79,7 @@ resource.aws.s3.endpoint=
# user data local directory path, please make sure the directory exists and
have read write permissions
data.basedir.path=/tmp/dolphinscheduler
-# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS
+# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
resource.storage.type=LOCAL
# resource store on HDFS/S3/OSS path, resource file will store to this hadoop
hdfs path, self configuration,
@@ -108,6 +108,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
+# alibaba cloud access key id, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.id=<your-access-key-id>
+# alibaba cloud access key secret, required if you set
resource.storage.type=OBS
+resource.huawei.cloud.access.key.secret=<your-access-key-secret>
+# oss bucket name, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.bucket.name=dolphinscheduler
+# oss bucket endpoint, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
+
# if resource.storage.type=HDFS, the user must have the permission to create
directories under the HDFS root path
resource.hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index 7af68b6701..8bb9481fc8 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -115,6 +115,7 @@
<casdoor.version>1.6.0</casdoor.version>
<azure-sdk-bom.version>1.2.10</azure-sdk-bom.version>
<protobuf.version>3.17.2</protobuf.version>
+ <esdk-obs.version>3.23.3</esdk-obs.version>
<system-lambda.version>1.2.1</system-lambda.version>
</properties>
@@ -896,6 +897,23 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.huaweicloud</groupId>
+ <artifactId>esdk-obs-java-bundle</artifactId>
+ <version>${esdk-obs.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml
index 096306fa8c..0254e6c43a 100644
--- a/dolphinscheduler-common/pom.xml
+++ b/dolphinscheduler-common/pom.xml
@@ -93,6 +93,11 @@
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.huaweicloud</groupId>
+ <artifactId>esdk-obs-java-bundle</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index f2260c30cf..e98f5f202f 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -147,6 +147,9 @@ public final class Constants {
public static final String AZURE_BLOB_STORAGE_ACCOUNT_NAME =
"resource.azure.blob.storage.account.name";
+ public static final String HUAWEI_CLOUD_OBS_BUCKET_NAME =
"resource.huawei.cloud.obs.bucket.name";
+ public static final String HUAWEI_CLOUD_OBS_END_POINT =
"resource.huawei.cloud.obs.endpoint";
+
/**
* fetch applicationId way
*/
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
index 2ebc2e1a5c..143cb12a35 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
@@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums;
* data base types
*/
public enum ResUploadType {
- LOCAL, HDFS, S3, OSS, GCS, ABS, NONE
+ LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
}
diff --git a/dolphinscheduler-common/src/main/resources/common.properties
b/dolphinscheduler-common/src/main/resources/common.properties
index 43a1338152..ea4c126beb 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
-# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a
specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
+# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE. LOCAL type
is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///"
configuration
# please notice that LOCAL mode does not support reading and writing in
distributed mode, which mean you can only use your resource in one machine,
unless
# use shared file mount point
resource.storage.type=LOCAL
@@ -73,6 +73,17 @@ resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string>
+
+# huawei cloud access key id, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.id=<your-access-key-id>
+# huawei cloud access key secret, required if you set resource.storage.type=OBS
+resource.huawei.cloud.access.key.secret=<your-access-key-secret>
+# oss bucket name, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.bucket.name=dolphinscheduler
+# oss bucket endpoint, required if you set resource.storage.type=OBS
+resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
+
+
# if resource.storage.type=HDFS, the user must have the permission to create
directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if
resource.storage.type=HDFS and namenode HA is enabled, you need to copy
core-site.xml and hdfs-site.xml to conf dir
diff --git a/dolphinscheduler-dist/release-docs/LICENSE
b/dolphinscheduler-dist/release-docs/LICENSE
index ffc1a39ec6..ab26aeda4a 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -567,6 +567,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
casdoor-spring-boot-starter 1.6.0
https://mvnrepository.com/artifact/org.casbin/casdoor-spring-boot-starter/1.6.0
Apache 2.0
org.apache.oltu.oauth2.client 1.0.2
https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.client/1.0.2
Apache 2.0
org.apache.oltu.oauth2.common 1.0.2
https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.common/1.0.2
Apache 2.0
+ esdk-obs-java-bundle 3.23.3
https://mvnrepository.com/artifact/com.huaweicloud/esdk-obs-java-bundle/3.23.3
Apache 2.0
diff --git
a/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt
new file mode 100644
index 0000000000..d645695673
--- /dev/null
+++
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
index a85fcd3c4e..4d6edcd1cc 100644
--- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
@@ -57,6 +57,11 @@
<artifactId>dolphinscheduler-storage-abs</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-storage-obs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
index 33c34f4c53..7ead2c8a94 100644
---
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
@@ -27,7 +27,9 @@ public enum StorageType {
S3(3, "S3"),
GCS(4, "GCS"),
- ABS(5, "ABS");
+ ABS(5, "ABS"),
+
+ OBS(6, "OBS");
private final int code;
private final String name;
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
similarity index 64%
copy from dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
copy to dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
index a85fcd3c4e..d19923498b 100644
--- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
@@ -24,39 +24,23 @@
<version>dev-SNAPSHOT</version>
</parent>
- <artifactId>dolphinscheduler-storage-all</artifactId>
+ <artifactId>dolphinscheduler-storage-obs</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-storage-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-storage-s3</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-storage-hdfs</artifactId>
- <version>${project.version}</version>
+ <groupId>com.huaweicloud</groupId>
+ <artifactId>esdk-obs-java-bundle</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-storage-oss</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-storage-gcs</artifactId>
+ <artifactId>dolphinscheduler-storage-api</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-storage-abs</artifactId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
-
</project>
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
new file mode 100644
index 0000000000..6dc5318519
--- /dev/null
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.storage.obs;
+
+import static
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
+import static
org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
+import static
org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE;
+import static
org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ResUploadType;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import com.obs.services.ObsClient;
+import com.obs.services.exception.ObsException;
+import com.obs.services.internal.ServiceException;
+import com.obs.services.model.DeleteObjectsRequest;
+import com.obs.services.model.GetObjectRequest;
+import com.obs.services.model.ListObjectsRequest;
+import com.obs.services.model.ObjectListing;
+import com.obs.services.model.ObjectMetadata;
+import com.obs.services.model.ObsObject;
+import com.obs.services.model.PutObjectRequest;
+
+@Data
+@Slf4j
+public class ObsStorageOperator implements Closeable, StorageOperate {
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String bucketName;
+
+ private String endPoint;
+
+ private ObsClient obsClient;
+
+ public ObsStorageOperator() {
+ }
+
+ public void init() {
+ this.accessKeyId = readObsAccessKeyID();
+ this.accessKeySecret = readObsAccessKeySecret();
+ this.endPoint = readObsEndPoint();
+ this.bucketName = readObsBucketName();
+ this.obsClient = buildObsClient();
+ ensureBucketSuccessfullyCreated(bucketName);
+ }
+
+ protected String readObsAccessKeyID() {
+ return
PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_ID);
+ }
+
+ protected String readObsAccessKeySecret() {
+ return
PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_SECRET);
+ }
+
+ protected String readObsBucketName() {
+ return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_BUCKET_NAME);
+ }
+
+ protected String readObsEndPoint() {
+ return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_END_POINT);
+ }
+
+ @Override
+ public void close() throws IOException {
+ obsClient.close();
+ }
+
+ @Override
+ public void createTenantDirIfNotExists(String tenantCode) throws Exception
{
+ mkdir(tenantCode, getObsResDir(tenantCode));
+ mkdir(tenantCode, getObsUdfDir(tenantCode));
+ }
+
+ @Override
+ public String getResDir(String tenantCode) {
+ return getObsResDir(tenantCode) + FOLDER_SEPARATOR;
+ }
+
+ @Override
+ public String getUdfDir(String tenantCode) {
+ return getObsUdfDir(tenantCode) + FOLDER_SEPARATOR;
+ }
+
+ @Override
+ public boolean mkdir(String tenantCode, String path) throws IOException {
+ final String key = path + FOLDER_SEPARATOR;
+ if (!obsClient.doesObjectExist(bucketName, key)) {
+ createObsPrefix(bucketName, key);
+ }
+ return true;
+ }
+
+ protected void createObsPrefix(final String bucketName, final String key) {
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(0L);
+ InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
+ PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName,
key, emptyContent);
+ obsClient.putObject(putObjectRequest);
+ }
+
+ @Override
+ public String getResourceFullName(String tenantCode, String fileName) {
+ if (fileName.startsWith(FOLDER_SEPARATOR)) {
+ fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
+ }
+ return String.format(FORMAT_S_S, getObsResDir(tenantCode), fileName);
+ }
+
+ @Override
+ public String getResourceFileName(String tenantCode, String fullName) {
+ String resDir = getResDir(tenantCode);
+ return fullName.replaceFirst(resDir, "");
+ }
+
+ @Override
+ public String getFileName(ResourceType resourceType, String tenantCode,
String fileName) {
+ if (fileName.startsWith(FOLDER_SEPARATOR)) {
+ fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
+ }
+ return getDir(resourceType, tenantCode) + fileName;
+ }
+
+ @Override
+ public boolean delete(String fullName, List<String> childrenPathList,
boolean recursive) throws IOException {
+ // append the resource fullName to the list for deletion.
+ childrenPathList.add(fullName);
+
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(bucketName);
+ for (String deleteKys : childrenPathList) {
+ deleteObjectsRequest.addKeyAndVersion(deleteKys);
+ }
+
+ try {
+ obsClient.deleteObjects(deleteObjectsRequest);
+ } catch (Exception e) {
+ log.error("delete objects error", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void download(String tenantCode, String srcFilePath, String
dstFilePath,
+ boolean overwrite) throws IOException {
+ File dstFile = new File(dstFilePath);
+ if (dstFile.isDirectory()) {
+ Files.delete(dstFile.toPath());
+ } else {
+ Files.createDirectories(dstFile.getParentFile().toPath());
+ }
+ ObsObject obsObject = obsClient.getObject(bucketName, srcFilePath);
+ try (
+ InputStream obsInputStream = obsObject.getObjectContent();
+ FileOutputStream fos = new FileOutputStream(dstFilePath)) {
+ byte[] readBuf = new byte[1024];
+ int readLen;
+ while ((readLen = obsInputStream.read(readBuf)) > 0) {
+ fos.write(readBuf, 0, readLen);
+ }
+ } catch (ObsException e) {
+ throw new IOException(e);
+ } catch (FileNotFoundException e) {
+ log.error("cannot find the destination file {}", dstFilePath);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean exists(String fileName) throws IOException {
+ return obsClient.doesObjectExist(bucketName, fileName);
+ }
+
+ @Override
+ public boolean delete(String filePath, boolean recursive) throws
IOException {
+ try {
+ obsClient.deleteObject(bucketName, filePath);
+ return true;
+ } catch (ObsException e) {
+ log.error("fail to delete the object, the resource path is {}",
filePath, e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean copy(String srcPath, String dstPath, boolean deleteSource,
boolean overwrite) throws IOException {
+ obsClient.copyObject(bucketName, srcPath, bucketName, dstPath);
+ if (deleteSource) {
+ obsClient.deleteObject(bucketName, srcPath);
+ }
+ return true;
+ }
+
+ @Override
+ public String getDir(ResourceType resourceType, String tenantCode) {
+ switch (resourceType) {
+ case UDF:
+ return getUdfDir(tenantCode);
+ case FILE:
+ return getResDir(tenantCode);
+ case ALL:
+ return getObsDataBasePath();
+ default:
+ return "";
+ }
+ }
+
+ @Override
+ public boolean upload(String tenantCode, String srcFile, String dstPath,
boolean deleteSource,
+ boolean overwrite) throws IOException {
+ try {
+ obsClient.putObject(bucketName, dstPath, new File(srcFile));
+ if (deleteSource) {
+ Files.delete(Paths.get(srcFile));
+ }
+ return true;
+ } catch (ObsException e) {
+ log.error("upload failed, the bucketName is {}, the filePath is
{}", bucketName, dstPath, e);
+ return false;
+ }
+ }
+
+ @Override
+ public List<String> vimFile(String tenantCode, String filePath, int
skipLineNums, int limit) throws IOException {
+ if (StringUtils.isBlank(filePath)) {
+ log.error("file path:{} is empty", filePath);
+ return Collections.emptyList();
+ }
+ ObsObject obsObject = obsClient.getObject(bucketName, filePath);
+ try (BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(obsObject.getObjectContent()))) {
+ Stream<String> stream =
bufferedReader.lines().skip(skipLineNums).limit(limit);
+ return stream.collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public ResUploadType returnStorageType() {
+ return ResUploadType.OBS;
+ }
+
+ @Override
+ public List<StorageEntity> listFilesStatusRecursively(String path, String
defaultPath, String tenantCode,
+ ResourceType type) {
+ List<StorageEntity> storageEntityList = new ArrayList<>();
+ LinkedList<StorageEntity> foldersToFetch = new LinkedList<>();
+
+ StorageEntity initialEntity = null;
+ try {
+ initialEntity = getFileStatus(path, defaultPath, tenantCode, type);
+ } catch (Exception e) {
+ log.error("error while listing files status recursively, path:
{}", path, e);
+ return storageEntityList;
+ }
+ foldersToFetch.add(initialEntity);
+
+ while (!foldersToFetch.isEmpty()) {
+ String pathToExplore = foldersToFetch.pop().getFullName();
+ try {
+ List<StorageEntity> tempList = listFilesStatus(pathToExplore,
defaultPath, tenantCode, type);
+ for (StorageEntity temp : tempList) {
+ if (temp.isDirectory()) {
+ foldersToFetch.add(temp);
+ }
+ }
+ storageEntityList.addAll(tempList);
+ } catch (Exception e) {
+ log.error("error while listing files stat:wus recursively,
path: {}", pathToExplore, e);
+ }
+ }
+
+ return storageEntityList;
+ }
+
+ @Override
+ public List<StorageEntity> listFilesStatus(String path, String
defaultPath, String tenantCode,
+ ResourceType type) throws
Exception {
+ List<StorageEntity> storageEntityList = new ArrayList<>();
+
+ ListObjectsRequest request = new ListObjectsRequest();
+ request.setBucketName(bucketName);
+ request.setPrefix(path);
+ request.setDelimiter(FOLDER_SEPARATOR);
+ ObjectListing result = null;
+ try {
+ result = obsClient.listObjects(request);
+ } catch (Exception e) {
+ throw new ServiceException("Get ObsClient file list exception", e);
+ }
+
+ while (result != null) {
+ String nextMarker = result.getNextMarker();
+ List<ObsObject> objects = result.getObjects();
+
+ for (ObsObject object : objects) {
+ if (!object.getObjectKey().endsWith(FOLDER_SEPARATOR)) {
+ // the path is a file
+ String[] aliasArr =
object.getObjectKey().split(FOLDER_SEPARATOR);
+ String alias = aliasArr[aliasArr.length - 1];
+ String fileName = StringUtils.difference(defaultPath,
object.getObjectKey());
+
+ StorageEntity entity = new StorageEntity();
+ ObjectMetadata metadata = object.getMetadata();
+ entity.setAlias(alias);
+ entity.setFileName(fileName);
+ entity.setFullName(object.getObjectKey());
+ entity.setDirectory(false);
+ entity.setUserName(tenantCode);
+ entity.setType(type);
+ entity.setSize(metadata.getContentLength());
+ entity.setCreateTime(metadata.getLastModified());
+ entity.setUpdateTime(metadata.getLastModified());
+ entity.setPfullName(path);
+
+ storageEntityList.add(entity);
+ }
+ }
+
+ for (String commonPrefix : result.getCommonPrefixes()) {
+ // the paths in commonPrefix are directories
+ String suffix = StringUtils.difference(path, commonPrefix);
+ String fileName = StringUtils.difference(defaultPath,
commonPrefix);
+
+ StorageEntity entity = new StorageEntity();
+ entity.setAlias(suffix);
+ entity.setFileName(fileName);
+ entity.setFullName(commonPrefix);
+ entity.setDirectory(true);
+ entity.setUserName(tenantCode);
+ entity.setType(type);
+ entity.setSize(0);
+ entity.setCreateTime(null);
+ entity.setUpdateTime(null);
+ entity.setPfullName(path);
+
+ storageEntityList.add(entity);
+ }
+
+ if (!StringUtils.isNotBlank(nextMarker)) {
+ break;
+ }
+
+ request.setMarker(nextMarker);
+ try {
+ result = obsClient.listObjects(request);
+ } catch (Exception e) {
+ throw new ServiceException("Get ObsClient file list
exception", e);
+ }
+ }
+ return storageEntityList;
+ }
+
+ @Override
+ public StorageEntity getFileStatus(String path, String defaultPath, String
tenantCode,
+ ResourceType type) throws Exception {
+
+ if (path.endsWith(FOLDER_SEPARATOR)) {
+ // the path is a directory that may or may not exist in ObsClient
+ String alias = findDirAlias(path);
+ String fileName = StringUtils.difference(defaultPath, path);
+
+ StorageEntity entity = new StorageEntity();
+ entity.setAlias(alias);
+ entity.setFileName(fileName);
+ entity.setFullName(path);
+ entity.setDirectory(true);
+ entity.setUserName(tenantCode);
+ entity.setType(type);
+ entity.setSize(0);
+
+ return entity;
+
+ } else {
+ GetObjectRequest request = new GetObjectRequest();
+ request.setBucketName(bucketName);
+ request.setObjectKey(path);
+ ObsObject object;
+ try {
+ object = obsClient.getObject(request);
+ } catch (Exception e) {
+ throw new ServiceException("Get ObsClient file list
exception", e);
+ }
+
+ String[] aliasArr = object.getObjectKey().split(FOLDER_SEPARATOR);
+ String alias = aliasArr[aliasArr.length - 1];
+ String fileName = StringUtils.difference(defaultPath,
object.getObjectKey());
+
+ StorageEntity entity = new StorageEntity();
+ ObjectMetadata metadata = object.getMetadata();
+ entity.setAlias(alias);
+ entity.setFileName(fileName);
+ entity.setFullName(object.getObjectKey());
+ entity.setDirectory(false);
+ entity.setUserName(tenantCode);
+ entity.setType(type);
+ entity.setSize(metadata.getContentLength());
+ entity.setCreateTime(metadata.getLastModified());
+ entity.setUpdateTime(metadata.getLastModified());
+
+ return entity;
+ }
+
+ }
+
+ @Override
+ public void deleteTenant(String tenantCode) throws Exception {
+ deleteTenantCode(tenantCode);
+ }
+
+ public String getObsResDir(String tenantCode) {
+ return String.format("%s/" + RESOURCE_TYPE_FILE,
getObsTenantDir(tenantCode));
+ }
+
+ public String getObsUdfDir(String tenantCode) {
+ return String.format("%s/" + RESOURCE_TYPE_UDF,
getObsTenantDir(tenantCode));
+ }
+
+ public String getObsTenantDir(String tenantCode) {
+ return String.format(FORMAT_S_S, getObsDataBasePath(), tenantCode);
+ }
+
+ public String getObsDataBasePath() {
+ if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
+ return "";
+ } else {
+ return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, "");
+ }
+ }
+
+ protected void deleteTenantCode(String tenantCode) {
+ deleteDir(getResDir(tenantCode));
+ deleteDir(getUdfDir(tenantCode));
+ }
+
+ public void ensureBucketSuccessfullyCreated(String bucketName) {
+ if (StringUtils.isBlank(bucketName)) {
+ throw new
IllegalArgumentException("resource.alibaba.cloud.obs.bucket.name is empty");
+ }
+
+ boolean existsBucket = obsClient.headBucket(bucketName);
+ if (!existsBucket) {
+ throw new IllegalArgumentException(
+ "bucketName: " + bucketName + " is not exists, you need to
create them by yourself");
+ }
+
+ log.info("bucketName: {} has been found", bucketName);
+ }
+
+ protected void deleteDir(String directoryName) {
+ if (obsClient.doesObjectExist(bucketName, directoryName)) {
+ obsClient.deleteObject(bucketName, directoryName);
+ }
+ }
+
+ protected ObsClient buildObsClient() {
+ return new ObsClient(accessKeyId, accessKeySecret, endPoint);
+ }
+
+ private String findDirAlias(String dirPath) {
+ if (!dirPath.endsWith(FOLDER_SEPARATOR)) {
+ return dirPath;
+ }
+
+ Path path = Paths.get(dirPath);
+ return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR;
+ }
+}
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
similarity index 51%
copy from
dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
copy to
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
index 33c34f4c53..2e67103931 100644
---
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
@@ -15,42 +15,29 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.storage.api;
+package org.apache.dolphinscheduler.plugin.storage.obs;
-import java.util.Optional;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageType;
-public enum StorageType {
+import com.google.auto.service.AutoService;
- LOCAL(0, "LOCAL"),
- HDFS(1, "HDFS"),
- OSS(2, "OSS"),
- S3(3, "S3"),
- GCS(4, "GCS"),
+@AutoService({StorageOperateFactory.class})
+public class ObsStorageOperatorFactory implements StorageOperateFactory {
- ABS(5, "ABS");
-
- private final int code;
- private final String name;
-
- StorageType(int code, String name) {
- this.code = code;
- this.name = name;
- }
-
- public int getCode() {
- return code;
+ public ObsStorageOperatorFactory() {
}
- public String getName() {
- return name;
+ @Override
+ public StorageOperate createStorageOperate() {
+ ObsStorageOperator ossOperator = new ObsStorageOperator();
+ ossOperator.init();
+ return ossOperator;
}
- public static Optional<StorageType> getStorageType(String name) {
- for (StorageType storageType : StorageType.values()) {
- if (storageType.getName().equals(name)) {
- return Optional.of(storageType);
- }
- }
- return Optional.empty();
+ @Override
+ public StorageType getStorageOperate() {
+ return StorageType.OBS;
}
}
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
new file mode 100644
index 0000000000..4aabdb6bd8
--- /dev/null
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.storage.obs;
+
+import static
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
+import static
org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.obs.services.ObsClient;
+
+@ExtendWith(MockitoExtension.class)
+public class ObsStorageOperatorTest {
+
+ private static final String ACCESS_KEY_ID_MOCK = "ACCESS_KEY_ID_MOCK";
+ private static final String ACCESS_KEY_SECRET_MOCK =
"ACCESS_KEY_SECRET_MOCK";
+ private static final String END_POINT_MOCK = "END_POINT_MOCK";
+ private static final String BUCKET_NAME_MOCK = "BUCKET_NAME_MOCK";
+ private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK";
+ private static final String DIR_MOCK = "DIR_MOCK";
+ private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK";
+ private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK";
+ private static final String FULL_NAME = "/tmp/dir1/";
+
+ private static final String DEFAULT_PATH = "/tmp/";
+ @Mock
+ private ObsClient obsClientMock;
+
+ private ObsStorageOperator obsOperator;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ obsOperator = spy(new ObsStorageOperator());
+ doReturn(ACCESS_KEY_ID_MOCK).when(obsOperator)
+ .readObsAccessKeyID();
+ doReturn(ACCESS_KEY_SECRET_MOCK).when(obsOperator)
+ .readObsAccessKeySecret();
+ doReturn(BUCKET_NAME_MOCK).when(obsOperator).readObsBucketName();
+ doReturn(END_POINT_MOCK).when(obsOperator).readObsEndPoint();
+ doReturn(obsClientMock).when(obsOperator).buildObsClient();
+ doNothing().when(obsOperator).ensureBucketSuccessfullyCreated(any());
+
+ obsOperator.init();
+
+ }
+
+ @Test
+ public void initObsOperator() {
+ verify(obsOperator, times(1)).buildObsClient();
+ Assertions.assertEquals(ACCESS_KEY_ID_MOCK,
obsOperator.getAccessKeyId());
+ Assertions.assertEquals(ACCESS_KEY_SECRET_MOCK,
obsOperator.getAccessKeySecret());
+ Assertions.assertEquals(BUCKET_NAME_MOCK, obsOperator.getBucketName());
+ }
+
+ @Test
+ public void tearDownObsOperator() throws IOException {
+ doNothing().when(obsClientMock).close();
+ obsOperator.close();
+ verify(obsClientMock, times(1)).close();
+ }
+
+ @Test
+ public void createTenantResAndUdfDir() throws Exception {
+ doReturn(DIR_MOCK).when(obsOperator).getObsResDir(TENANT_CODE_MOCK);
+ doReturn(DIR_MOCK).when(obsOperator).getObsUdfDir(TENANT_CODE_MOCK);
+ doReturn(true).when(obsOperator).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ obsOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK);
+ verify(obsOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ }
+
+ @Test
+ public void getResDir() {
+ final String expectedResourceDir =
String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK);
+ final String dir = obsOperator.getResDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedResourceDir, dir);
+ }
+
+ @Test
+ public void getUdfDir() {
+ final String expectedUdfDir =
String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK);
+ final String dir = obsOperator.getUdfDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedUdfDir, dir);
+ }
+
+ @Test
+ public void mkdirWhenDirExists() {
+ boolean isSuccess = false;
+ try {
+ final String key = DIR_MOCK + FOLDER_SEPARATOR;
+
doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key);
+ isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK,
key);
+
+ } catch (IOException e) {
+ Assertions.fail("test failed due to unexpected IO exception");
+ }
+
+ Assertions.assertTrue(isSuccess);
+ }
+
+ @Test
+ public void mkdirWhenDirNotExists() {
+ boolean isSuccess = true;
+ try {
+ final String key = DIR_MOCK + FOLDER_SEPARATOR;
+
doReturn(false).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key);
+ doNothing().when(obsOperator).createObsPrefix(BUCKET_NAME_MOCK,
key);
+ isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK,
key);
+ verify(obsOperator, times(1)).createObsPrefix(BUCKET_NAME_MOCK,
key);
+
+ } catch (IOException e) {
+ Assertions.fail("test failed due to unexpected IO exception");
+ }
+
+ Assertions.assertTrue(isSuccess);
+ }
+
+ @Test
+ public void getResourceFullName() {
+ final String expectedResourceFullName =
+ String.format("dolphinscheduler/%s/resources/%s",
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ final String resourceFullName =
obsOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ Assertions.assertEquals(expectedResourceFullName, resourceFullName);
+ }
+
+ @Test
+ public void getResourceFileName() {
+ final String expectedResourceFileName = FILE_NAME_MOCK;
+ final String resourceFullName =
+ String.format("dolphinscheduler/%s/resources/%s",
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ final String resourceFileName =
obsOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName);
+ Assertions.assertEquals(expectedResourceFileName, resourceFileName);
+ }
+
+ @Test
+ public void getFileName() {
+ final String expectedFileName =
+ String.format("dolphinscheduler/%s/resources/%s",
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ final String fileName = obsOperator.getFileName(ResourceType.FILE,
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ Assertions.assertEquals(expectedFileName, fileName);
+ }
+
+ @Test
+ public void exists() {
+ boolean doesExist = false;
+ doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK,
FILE_NAME_MOCK);
+ try {
+ doesExist = obsOperator.exists(FILE_NAME_MOCK);
+ } catch (IOException e) {
+ Assertions.fail("unexpected IO exception in unit test");
+ }
+
+ Assertions.assertTrue(doesExist);
+ verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK,
FILE_NAME_MOCK);
+ }
+
+ @Test
+ public void delete() {
+ boolean isDeleted = false;
+ doReturn(null).when(obsClientMock).deleteObject(anyString(),
anyString());
+ try {
+ isDeleted = obsOperator.delete(FILE_NAME_MOCK, true);
+ } catch (IOException e) {
+ Assertions.fail("unexpected IO exception in unit test");
+ }
+
+ Assertions.assertTrue(isDeleted);
+ verify(obsClientMock, times(1)).deleteObject(anyString(), anyString());
+ }
+
+ @Test
+ public void copy() {
+ boolean isSuccess = false;
+ doReturn(null).when(obsClientMock).copyObject(anyString(),
anyString(), anyString(), anyString());
+ try {
+ isSuccess = obsOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK,
false, false);
+ } catch (IOException e) {
+ Assertions.fail("unexpected IO exception in unit test");
+ }
+
+ Assertions.assertTrue(isSuccess);
+ verify(obsClientMock, times(1)).copyObject(anyString(), anyString(),
anyString(), anyString());
+ }
+
+ @Test
+ public void deleteTenant() {
+ doNothing().when(obsOperator).deleteTenantCode(anyString());
+ try {
+ obsOperator.deleteTenant(TENANT_CODE_MOCK);
+ } catch (Exception e) {
+ Assertions.fail("unexpected exception caught in unit test");
+ }
+
+ verify(obsOperator, times(1)).deleteTenantCode(anyString());
+ }
+
+ @Test
+ public void getObsResDir() {
+ final String expectedObsResDir =
String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK);
+ final String obsResDir = obsOperator.getObsResDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedObsResDir, obsResDir);
+ }
+
+ @Test
+ public void getObsUdfDir() {
+ final String expectedObsUdfDir =
String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK);
+ final String obsUdfDir = obsOperator.getObsUdfDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedObsUdfDir, obsUdfDir);
+ }
+
+ @Test
+ public void getObsTenantDir() {
+ final String expectedObsTenantDir = String.format(FORMAT_S_S,
DIR_MOCK, TENANT_CODE_MOCK);
+ doReturn(DIR_MOCK).when(obsOperator).getObsDataBasePath();
+ final String obsTenantDir =
obsOperator.getObsTenantDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedObsTenantDir, obsTenantDir);
+ }
+
+ @Test
+ public void deleteDir() {
+ doReturn(true).when(obsClientMock).doesObjectExist(anyString(),
anyString());
+ obsOperator.deleteDir(DIR_MOCK);
+ verify(obsClientMock, times(1)).deleteObject(anyString(), anyString());
+ }
+
+ @Test
+ public void testGetFileStatus() throws Exception {
+ StorageEntity entity = obsOperator.getFileStatus(FULL_NAME,
DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
+ Assertions.assertEquals(FULL_NAME, entity.getFullName());
+ Assertions.assertEquals("dir1/", entity.getFileName());
+ }
+
+ @Test
+ public void testListFilesStatus() throws Exception {
+ List<StorageEntity> result =
+
obsOperator.listFilesStatus("dolphinscheduler/default/resources/",
+ "dolphinscheduler/default/resources/",
+ "default", ResourceType.FILE);
+ Assertions.assertEquals(0, result.size());
+ }
+
+ @Test
+ public void testListFilesStatusRecursively() throws Exception {
+ StorageEntity entity = new StorageEntity();
+ entity.setFullName(FULL_NAME);
+
+ doReturn(entity).when(obsOperator).getFileStatus(FULL_NAME,
DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
+
doReturn(Collections.EMPTY_LIST).when(obsOperator).listFilesStatus(anyString(),
anyString(), anyString(),
+ Mockito.any(ResourceType.class));
+
+ List<StorageEntity> result =
+ obsOperator.listFilesStatusRecursively(FULL_NAME,
DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
+ Assertions.assertEquals(0, result.size());
+ }
+}
diff --git a/dolphinscheduler-storage-plugin/pom.xml
b/dolphinscheduler-storage-plugin/pom.xml
index 7e27358a64..a6a86bc2a6 100644
--- a/dolphinscheduler-storage-plugin/pom.xml
+++ b/dolphinscheduler-storage-plugin/pom.xml
@@ -35,6 +35,7 @@
<module>dolphinscheduler-storage-oss</module>
<module>dolphinscheduler-storage-gcs</module>
<module>dolphinscheduler-storage-abs</module>
+ <module>dolphinscheduler-storage-obs</module>
</modules>
<dependencyManagement>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 2d314382c5..ab46bb58d5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -475,6 +475,12 @@ public class TaskConstants {
public static final String ALIBABA_CLOUD_ACCESS_KEY_SECRET =
"resource.alibaba.cloud.access.key.secret";
public static final String ALIBABA_CLOUD_REGION =
"resource.alibaba.cloud.region";
+ /**
+ * huawei cloud config
+ */
+ public static final String HUAWEI_CLOUD_ACCESS_KEY_ID =
"resource.huawei.cloud.access.key.id";
+ public static final String HUAWEI_CLOUD_ACCESS_KEY_SECRET =
"resource.huawei.cloud.access.key.secret";
+
/**
* use for k8s task
*/
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 28d2f1fe70..23fb7919d9 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -495,3 +495,4 @@ snowflake-jdbc-3.13.29.jar
azure-storage-blob-12.21.0.jar
azure-storage-internal-avro-12.6.0.jar
vertica-jdbc-12.0.4-0.jar
+esdk-obs-java-bundle-3.23.3.jar
\ No newline at end of file