This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new af7b359333 [Feature][Task Plugin] Add SageMaker Pipeline task plugin 
for MLOps scenario (#10826) (#10935)
af7b359333 is described below

commit af7b359333357ce3e32972c914ff539c8c9aad51
Author: JieguangZhou <[email protected]>
AuthorDate: Thu Jul 21 18:16:58 2022 +0800

    [Feature][Task Plugin] Add SageMaker Pipeline task plugin for MLOps 
scenario (#10826) (#10935)
    
    * add sagemaker pipeline task plugin
    
    [DOC] add Sagemaker task plugin document
    
    change license and fix dependencies
    
    * [fix] Optimize the code
    
    * Update 
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
    
    Optimization parameter judgment
    
    Co-authored-by: caishunfeng <[email protected]>
    
    * [fix] nips
    
    Co-authored-by: caishunfeng <[email protected]>
---
 docs/configs/docsdev.js                            |   8 +
 docs/docs/en/guide/task/sagemaker.md               |  64 +++++++
 docs/docs/zh/guide/task/sagemaker.md               |  57 ++++++
 docs/img/tasks/demo/sagemaker_pipeline.png         | Bin 0 -> 127055 bytes
 docs/img/tasks/icons/sagemaker.png                 | Bin 0 -> 25945 bytes
 dolphinscheduler-dist/release-docs/LICENSE         |   1 +
 .../licenses/LICENSE-aws-java-sdk-sagemaker.txt    | 201 +++++++++++++++++++++
 .../dolphinscheduler-task-all/pom.xml              |   6 +
 .../dolphinscheduler-task-sagemaker/pom.xml        |  54 ++++++
 .../plugin/task/sagemaker/PipelineUtils.java       | 125 +++++++++++++
 .../plugin/task/sagemaker/SagemakerConstants.java  |  27 +++
 .../plugin/task/sagemaker/SagemakerParameters.java |  43 +++++
 .../plugin/task/sagemaker/SagemakerTask.java       | 161 +++++++++++++++++
 .../task/sagemaker/SagemakerTaskChannel.java       |  49 +++++
 .../sagemaker/SagemakerTaskChannelFactory.java     |  45 +++++
 .../task/sagemaker/SagemakerTaskException.java     |  36 ++++
 .../plugin/task/sagemaker/SagemakerTaskTest.java   | 132 ++++++++++++++
 .../task/sagemaker/SagemakerRequestJson.json       |   9 +
 dolphinscheduler-task-plugin/pom.xml               |   1 +
 .../public/images/task-icons/sagemaker.png         | Bin 0 -> 25945 bytes
 .../public/images/task-icons/sagemaker_hover.png   | Bin 0 -> 142778 bytes
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-sagemaker.ts   |  38 ++++
 .../projects/task/components/node/format-data.ts   |   4 +
 .../projects/task/components/node/tasks/index.ts   |   4 +-
 .../task/components/node/tasks/use-sagemaker.ts    |  81 +++++++++
 .../views/projects/task/components/node/types.ts   |   1 +
 .../src/views/projects/task/constants/task-type.ts |   5 +
 .../workflow/components/dag/dag.module.scss        |   6 +
 tools/dependencies/known-dependencies.txt          |   1 +
 30 files changed, 1159 insertions(+), 1 deletion(-)

diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 0fdd2a41e4..b688843812 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -181,6 +181,10 @@ export default {
                                 title: 'Dinky',
                                 link: 
'/en-us/docs/dev/user_doc/guide/task/dinky.html',
                             },
+                            {
+                                title: 'SageMaker',
+                                link: 
'/en-us/docs/dev/user_doc/guide/task/sagemaker.html',
+                            },
                         ],
                     },
                     {
@@ -601,6 +605,10 @@ export default {
                                 title: 'Dinky',
                                 link: 
'/zh-cn/docs/dev/user_doc/guide/task/dinky.html',
                             },
+                            {
+                                title: 'SageMaker',
+                                link: 
'/zh-cn/docs/dev/user_doc/guide/task/SageMaker.html',
+                            },
                         ],
                     },
                     {
diff --git a/docs/docs/en/guide/task/sagemaker.md 
b/docs/docs/en/guide/task/sagemaker.md
new file mode 100644
index 0000000000..8ee01b1b66
--- /dev/null
+++ b/docs/docs/en/guide/task/sagemaker.md
@@ -0,0 +1,64 @@
+# SageMaker Node
+
+## Overview
+
+[Amazon SageMaker](https://docs.aws.amazon.com/sagemaker/index.html) is a 
fully managed machine learning service. With Amazon SageMaker, data scientists 
and developers can quickly build and train machine learning models, and then 
deploy them into a production-ready hosted environment.
+
+[Amazon SageMaker Model Building 
Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html) is a 
tool for building machine learning pipelines that take advantage of direct 
SageMaker integration.
+
+
+For users using big data and machine learning, SageMaker task plugin help 
users connect big data workflows with SageMaker usage scenarios.
+
+DolphinScheduler SageMaker task plugin features are as follows:
+
+- Start a SageMaker pipeline execution. Continuously get the execution status 
until the pipeline completes execution.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click 
the "Create Workflow" button to enter the
+  DAG editing page.
+- Drag from the toolbar <img src="../../../../img/tasks/icons/sagemaker.png" 
width="15"/> task node to canvas.
+
+## Task Example
+
+First, introduce some general parameters of DolphinScheduler:
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does 
not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, 
execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in 
first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to 
execute. If `Default` is selected,
+  randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to 
resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting 
the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in 
execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the 
task runs exceed the "timeout", an alarm
+  email will send and the task execution will fail.
+- **Predecessor task**: Selecting a predecessor task for the current task, 
will set the selected predecessor task as
+  upstream of the current task.
+
+Here are some specific parameters for the SagaMaker plugin:
+
+- **SagemakerRequestJson**: Request parameters of StartPipelineExecution,see 
also [AWS 
API](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+
+
+The task plugin are shown as follows:
+
+![sagemaker_pipeline](../../../../img/tasks/demo/sagemaker_pipeline.png)
+
+
+
+## Environment to prepare
+
+Some AWS configuration is required, modify a field in file `common.properties`
+```yaml
+# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This 
configuration is required
+resource.aws.access.key.id=<YOUR AWS ACCESS KEY>
+# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This 
configuration is required
+resource.aws.secret.access.key=<YOUR AWS SECRET KEY>
+# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This 
configuration is required
+resource.aws.region=<AWS REGION>
+```
\ No newline at end of file
diff --git a/docs/docs/zh/guide/task/sagemaker.md 
b/docs/docs/zh/guide/task/sagemaker.md
new file mode 100644
index 0000000000..1b0e728e82
--- /dev/null
+++ b/docs/docs/zh/guide/task/sagemaker.md
@@ -0,0 +1,57 @@
+# SageMaker 节点
+
+## 综述
+
+[Amazon SageMaker](https://aws.amazon.com/cn/pm/sagemaker) 是一个云机器学习平台。 
提供了完整的基础设施,工具和工作流来帮助用户可以创建、训练和发布机器学习模型。
+
+[Amazon SageMaker Model Building 
Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html) 
是一个可以直接使用SageMaker各种集成的机器学习管道构建工具,用户可以使用使用 Amazon SageMaker Pipeline 
来构建端到端的机器学习系统。
+
+对于使用大数据与人工智能的用户,SageMaker 任务组件帮助用户可以串联起大数据工作流与SagaMaker的使用场景。
+
+DolphinScheduler SageMaker 组件的功能: 
+- 启动 SageMaker Pipeline Execution,并持续获取状态,直至Pipeline执行完成。 
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的 <img src="../../../../img/tasks/icons/sagemaker.png" width="15"/> 
任务节点到画板中。
+
+
+## 任务样例
+
+首先介绍一些DS通用参数
+
+- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- **描述** :描述该节点的功能。
+- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
+- **环境名称** :配置运行脚本的环境。
+- **失败重试次数** :任务失败重新提交的次数。
+- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
+- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
+- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
+- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
+
+以上参数如无特殊需求,可以默认即可
+
+- **SagemakerRequestJson**: 启动SageMakerPipeline的需要的请求参数,可见 [AWS 
API](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+
+
+组件图示如下:
+
+![sagemaker_pipeline](../../../../img/tasks/demo/sagemaker_pipeline.png)
+
+
+
+## 环境配置
+
+需要进行AWS的一些配置,修改`common.properties`中的`xxxxx`为你的配置信息
+```yaml
+# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This 
configuration is required
+resource.aws.access.key.id=<YOUR AWS ACCESS KEY>
+# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This 
configuration is required
+resource.aws.secret.access.key=<YOUR AWS SECRET KEY>
+# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This 
configuration is required
+resource.aws.region=<AWS REGION>
+```
\ No newline at end of file
diff --git a/docs/img/tasks/demo/sagemaker_pipeline.png 
b/docs/img/tasks/demo/sagemaker_pipeline.png
new file mode 100644
index 0000000000..be4eff1653
Binary files /dev/null and b/docs/img/tasks/demo/sagemaker_pipeline.png differ
diff --git a/docs/img/tasks/icons/sagemaker.png 
b/docs/img/tasks/icons/sagemaker.png
new file mode 100644
index 0000000000..9b8206e741
Binary files /dev/null and b/docs/img/tasks/icons/sagemaker.png differ
diff --git a/dolphinscheduler-dist/release-docs/LICENSE 
b/dolphinscheduler-dist/release-docs/LICENSE
index 67a90d91a1..adb79ca250 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -432,6 +432,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
     aws-java-sdk-s3 1.12.160  
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3/1.12.160  
Apache 2.0
     aws-java-sdk-core-1.12.160 
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-core/1.12.160  
Apache 2.0
     aws-java-sdk-kms-1.12.160 
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-kms/1.12.160  
Apache 2.0
+    aws-java-sdk-sagemaker-1.12.160 
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sagemaker/1.12.160
  Apache 2.0
     commons-text 1.8: 
https://mvnrepository.com/artifact/org.apache.commons/commons-text/1.8, Apache 
2.0
     httpasyncclient 4.1.4: 
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpasyncclient/4.1.4,
 Apache 2.0
     httpcore-nio 4.4.14: 
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore-nio/4.4.14,
 Apache 2.0
diff --git 
a/dolphinscheduler-dist/release-docs/licenses/LICENSE-aws-java-sdk-sagemaker.txt
 
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-aws-java-sdk-sagemaker.txt
new file mode 100644
index 0000000000..f49a4e16e6
--- /dev/null
+++ 
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-aws-java-sdk-sagemaker.txt
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 759b94505e..aebaa5e07f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -183,6 +183,12 @@
             <artifactId>dolphinscheduler-task-dinky</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-sagemaker</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
new file mode 100644
index 0000000000..66f761d825
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-sagemaker</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-sagemaker</artifactId>
+            <version>1.12.160</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
new file mode 100644
index 0000000000..b5df5b9ea5
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionResult;
+import 
com.amazonaws.services.sagemaker.model.ListPipelineExecutionStepsRequest;
+import com.amazonaws.services.sagemaker.model.ListPipelineExecutionStepsResult;
+import com.amazonaws.services.sagemaker.model.PipelineExecutionStep;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.StopPipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult;
+
+public class PipelineUtils {
+
+
+    protected final Logger logger = 
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT,
 getClass()));
+    private final AmazonSageMaker client;
+    private String pipelineExecutionArn;
+    private String clientRequestToken;
+    private String pipelineStatus;
+
+    public PipelineUtils(AmazonSageMaker client) {
+        this.client = client;
+    }
+
+    public int startPipelineExecution(StartPipelineExecutionRequest request) {
+        int exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
+        try {
+            StartPipelineExecutionResult result = 
client.startPipelineExecution(request);
+            pipelineExecutionArn = result.getPipelineExecutionArn();
+            clientRequestToken = request.getClientRequestToken();
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            logger.info("Start pipeline: {} success", pipelineExecutionArn);
+        } catch (Exception e) {
+            logger.error("Start pipeline error: {}", e.getMessage());
+        }
+
+        return exitStatusCode;
+    }
+
+    public void stopPipelineExecution() {
+        StopPipelineExecutionRequest request = new 
StopPipelineExecutionRequest();
+        request.setPipelineExecutionArn(pipelineExecutionArn);
+        request.setClientRequestToken(clientRequestToken);
+
+        try {
+            StopPipelineExecutionResult result = 
client.stopPipelineExecution(request);
+            logger.info("Stop pipeline: {} success", 
result.getPipelineExecutionArn());
+        } catch (Exception e) {
+            logger.error("Stop pipeline error: {}", e.getMessage());
+        }
+
+    }
+
+    public int checkPipelineExecutionStatus() {
+        describePipelineExecution();
+        while (pipelineStatus.equals("Executing")) {
+            logger.info("check Pipeline Steps running");
+            listPipelineExecutionSteps();
+            
ThreadUtils.sleep(SagemakerConstants.CHECK_PIPELINE_EXECUTION_STATUS_INTERVAL);
+            describePipelineExecution();
+        }
+
+        int exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
+        if (pipelineStatus.equals("Succeeded")) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        }
+        logger.info("exit : {}", exitStatusCode);
+        logger.info("PipelineExecutionStatus : {}", pipelineStatus);
+        return exitStatusCode;
+    }
+
+    private void describePipelineExecution() {
+        DescribePipelineExecutionRequest request = new 
DescribePipelineExecutionRequest();
+        request.setPipelineExecutionArn(pipelineExecutionArn);
+        DescribePipelineExecutionResult result = 
client.describePipelineExecution(request);
+        pipelineStatus = result.getPipelineExecutionStatus();
+        logger.info("PipelineExecutionStatus: {}", pipelineStatus);
+    }
+
+    private void listPipelineExecutionSteps() {
+        ListPipelineExecutionStepsRequest request = new 
ListPipelineExecutionStepsRequest();
+        request.setPipelineExecutionArn(pipelineExecutionArn);
+        request.setMaxResults(SagemakerConstants.PIPELINE_MAX_RESULTS);
+        ListPipelineExecutionStepsResult result = 
client.listPipelineExecutionSteps(request);
+        List<PipelineExecutionStep> steps = result.getPipelineExecutionSteps();
+        Collections.reverse(steps);
+        logger.info("pipelineStepsStatus: ");
+        for (PipelineExecutionStep step : steps) {
+            String stepMessage = step.toString();
+            logger.info(stepMessage);
+        }
+    }
+
+    public String getPipelineExecutionArn() {
+        return pipelineExecutionArn;
+    }
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerConstants.java
new file mode 100644
index 0000000000..143abbb644
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+public class SagemakerConstants {
+    public static final int CHECK_PIPELINE_EXECUTION_STATUS_INTERVAL = 5000;
+    public static final int PIPELINE_MAX_RESULTS = 100;
+
+    private SagemakerConstants() {
+        throw new IllegalStateException("Utility class");
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
new file mode 100644
index 0000000000..3b33eded1a
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@ToString
+public class SagemakerParameters extends AbstractParameters {
+
+    /**
+     * request script
+     */
+    private String sagemakerRequestJson;
+
+    @Override
+    public boolean checkParameters() {
+        return StringUtils.isNotEmpty(sagemakerRequestJson);
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
new file mode 100644
index 0000000000..1431da1e92
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static 
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import java.util.Map;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+/**
+ * SagemakerTask task, Used to start Sagemaker pipeline
+ */
+public class SagemakerTask extends AbstractTaskExecutor {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, 
false).configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, 
true).configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, 
true).setPropertyNamingStrategy(new 
PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    /**
+     * SageMaker parameters
+     */
+    private SagemakerParameters parameters;
+    private PipelineUtils utils;
+
+    public SagemakerTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() {
+        logger.info("Sagemaker task params {}", 
taskExecutionContext.getTaskParams());
+
+        parameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
SagemakerParameters.class);
+
+        if (!parameters.checkParameters()) {
+            throw new SagemakerTaskException("Sagemaker task params is not 
valid");
+        }
+
+    }
+
+    @Override
+    public void handle() throws SagemakerTaskException {
+        try {
+            int exitStatusCode = handleStartPipeline();
+            setExitStatusCode(exitStatusCode);
+        } catch (Exception e) {
+            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+            throw new SagemakerTaskException("SageMaker task error", e);
+        }
+    }
+
+    @Override
+    public void cancelApplication(boolean cancelApplication) {
+        // stop pipeline
+        utils.stopPipelineExecution();
+    }
+
+    public int handleStartPipeline() {
+        int exitStatusCode;
+        StartPipelineExecutionRequest request = createStartPipelineRequest();
+
+        try {
+            AmazonSageMaker client = createClient();
+            utils = new PipelineUtils(client);
+            setAppIds(utils.getPipelineExecutionArn());
+        } catch (Exception e) {
+            throw new SagemakerTaskException("can not connect aws ", e);
+        }
+
+        // Start pipeline
+        exitStatusCode = utils.startPipelineExecution(request);
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            // Keep checking the health status
+            exitStatusCode = utils.checkPipelineExecutionStatus();
+        }
+        return exitStatusCode;
+    }
+
+    public StartPipelineExecutionRequest createStartPipelineRequest() throws 
SagemakerTaskException {
+
+        String requestJson = parameters.getSagemakerRequestJson();
+        requestJson = parseRequstJson(requestJson);
+
+        StartPipelineExecutionRequest startPipelineRequest;
+        try {
+            startPipelineRequest = objectMapper.readValue(requestJson, 
StartPipelineExecutionRequest.class);
+        } catch (Exception e) {
+            logger.error("can not parse SagemakerRequestJson from json: {}", 
requestJson);
+            throw new SagemakerTaskException("can not parse 
SagemakerRequestJson ", e);
+        }
+
+        logger.info("Sagemaker task create StartPipelineRequest: {}", 
startPipelineRequest);
+        return startPipelineRequest;
+    }
+
+    @Override
+    public SagemakerParameters getParameters() {
+        return parameters;
+    }
+
+    private String parseRequstJson(String requestJson) {
+        // combining local and global parameters
+        Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
+        return ParameterUtils.convertParameterPlaceholders(requestJson, 
ParamUtils.convert(paramsMap));
+    }
+
+    private AmazonSageMaker createClient() {
+        final String awsAccessKeyId = 
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = 
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = 
PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new 
BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new 
AWSStaticCredentialsProvider(basicAWSCredentials);
+        // create a SageMaker client
+        return AmazonSageMakerClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
new file mode 100644
index 0000000000..9e88471cb6
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+public class SagemakerTaskChannel implements TaskChannel {
+
+    @Override
+    public void cancelApplication(boolean status) {
+
+    }
+
+    @Override
+    public SagemakerTask createTask(TaskExecutionContext taskRequest) {
+        return new SagemakerTask(taskRequest);
+    }
+
+    @Override
+    public AbstractParameters parseParameters(ParametersNode parametersNode) {
+        return JSONUtils.parseObject(parametersNode.getTaskParams(), 
SagemakerParameters.class);
+    }
+
+    @Override
+    public ResourceParametersHelper getResources(String parameters) {
+        return null;
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannelFactory.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannelFactory.java
new file mode 100644
index 0000000000..0c4afec462
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannelFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class SagemakerTaskChannelFactory implements TaskChannelFactory {
+    @Override
+    public TaskChannel create() {
+        return new SagemakerTaskChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "SAGEMAKER";
+    }
+
+    @Override
+    public List<PluginParams> getParams() {
+        return Collections.emptyList();
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskException.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskException.java
new file mode 100644
index 0000000000..92beb5c5c2
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+/**
+ * Custom SagemakerTaskException
+ */
+public class SagemakerTaskException extends RuntimeException {
+
+    public SagemakerTaskException() {
+        super();
+    }
+
+    public SagemakerTaskException(String message) {
+        super(message);
+    }
+
+    public SagemakerTaskException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
new file mode 100644
index 0000000000..a7dcdca7bd
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import 
org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.ListPipelineExecutionStepsResult;
+import com.amazonaws.services.sagemaker.model.PipelineExecutionStep;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JSONUtils.class, PropertyUtils.class,})
+@PowerMockIgnore({"javax.*"})
+@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
+public class SagemakerTaskTest {
+
+    private final String pipelineExecutionArn = "test-pipeline-arn";
+    private SagemakerTask sagemakerTask;
+    private AmazonSageMaker client;
+    private PipelineUtils pipelineUtils;
+
+    @Before
+    public void before() {
+        PowerMockito.mockStatic(PropertyUtils.class);
+        String parameters = buildParameters();
+        TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
+        
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        sagemakerTask = new SagemakerTask(taskExecutionContext);
+        sagemakerTask.init();
+        client = mock(AmazonSageMaker.class);
+        pipelineUtils = new PipelineUtils(client);
+
+        StartPipelineExecutionResult startPipelineExecutionResult = 
mock(StartPipelineExecutionResult.class);
+        
when(startPipelineExecutionResult.getPipelineExecutionArn()).thenReturn(pipelineExecutionArn);
+
+        StopPipelineExecutionResult stopPipelineExecutionResult = 
mock(StopPipelineExecutionResult.class);
+        
when(stopPipelineExecutionResult.getPipelineExecutionArn()).thenReturn(pipelineExecutionArn);
+
+        DescribePipelineExecutionResult describePipelineExecutionResult = 
mock(DescribePipelineExecutionResult.class);
+        
when(describePipelineExecutionResult.getPipelineExecutionStatus()).thenReturn("Executing",
 "Succeeded");
+
+        ListPipelineExecutionStepsResult listPipelineExecutionStepsResult = 
mock(ListPipelineExecutionStepsResult.class);
+        PipelineExecutionStep pipelineExecutionStep = 
mock(PipelineExecutionStep.class);
+        List<PipelineExecutionStep> pipelineExecutionSteps = new ArrayList<>();
+        pipelineExecutionSteps.add(pipelineExecutionStep);
+        pipelineExecutionSteps.add(pipelineExecutionStep);
+
+        when(pipelineExecutionStep.toString()).thenReturn("Test Step1", "Test 
Step2");
+        
when(listPipelineExecutionStepsResult.getPipelineExecutionSteps()).thenReturn(pipelineExecutionSteps);
+
+        
when(client.startPipelineExecution(any())).thenReturn(startPipelineExecutionResult);
+        
when(client.stopPipelineExecution(any())).thenReturn(stopPipelineExecutionResult);
+        
when(client.describePipelineExecution(any())).thenReturn(describePipelineExecutionResult);
+        
when(client.listPipelineExecutionSteps(any())).thenReturn(listPipelineExecutionStepsResult);
+
+    }
+
+    @Test
+    public void testStartPipelineRequest() throws Exception {
+        StartPipelineExecutionRequest request = 
sagemakerTask.createStartPipelineRequest();
+        Assert.assertEquals("AbalonePipeline", request.getPipelineName());
+        Assert.assertEquals("test Pipeline", 
request.getPipelineExecutionDescription());
+        Assert.assertEquals("AbalonePipeline", 
request.getPipelineExecutionDisplayName());
+        Assert.assertEquals("AbalonePipeline", request.getPipelineName());
+        Assert.assertEquals(new Integer(1), 
request.getParallelismConfiguration().getMaxParallelExecutionSteps());
+    }
+
+    @Test
+    public void testPipelineExecution() throws Exception {
+        
pipelineUtils.startPipelineExecution(sagemakerTask.createStartPipelineRequest());
+        Assert.assertEquals(pipelineExecutionArn, 
pipelineUtils.getPipelineExecutionArn());
+        Assert.assertEquals(0, pipelineUtils.checkPipelineExecutionStatus());
+        pipelineUtils.stopPipelineExecution();
+    }
+
+    private String buildParameters() {
+        SagemakerParameters parameters = new SagemakerParameters();
+        String sagemakerRequestJson;
+        try (InputStream i = 
this.getClass().getResourceAsStream("SagemakerRequestJson.json")) {
+            assert i != null;
+            sagemakerRequestJson = IOUtils.toString(i, StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        parameters.setSagemakerRequestJson(sagemakerRequestJson);
+
+        return JSONUtils.toJsonString(parameters);
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/resources/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerRequestJson.json
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/resources/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerRequestJson.json
new file mode 100644
index 0000000000..9a1a28bfa1
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/resources/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerRequestJson.json
@@ -0,0 +1,9 @@
+{
+  "ParallelismConfiguration": {
+    "MaxParallelExecutionSteps": 1
+  },
+  "PipelineExecutionDescription": "test Pipeline",
+  "PipelineExecutionDisplayName": "AbalonePipeline",
+  "PipelineName": "AbalonePipeline",
+  "PipelineParameters": [ ]
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/pom.xml 
b/dolphinscheduler-task-plugin/pom.xml
index f934cc5c83..a9b094b443 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -57,6 +57,7 @@
         <module>dolphinscheduler-task-openmldb</module>
         <module>dolphinscheduler-task-dvc</module>
         <module>dolphinscheduler-task-dinky</module>
+        <module>dolphinscheduler-task-sagemaker</module>
     </modules>
 
     <dependencyManagement>
diff --git a/dolphinscheduler-ui/public/images/task-icons/sagemaker.png 
b/dolphinscheduler-ui/public/images/task-icons/sagemaker.png
new file mode 100644
index 0000000000..9b8206e741
Binary files /dev/null and 
b/dolphinscheduler-ui/public/images/task-icons/sagemaker.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/sagemaker_hover.png 
b/dolphinscheduler-ui/public/images/task-icons/sagemaker_hover.png
new file mode 100644
index 0000000000..270e9fe563
Binary files /dev/null and 
b/dolphinscheduler-ui/public/images/task-icons/sagemaker_hover.png differ
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 3054129214..c0f2f25046 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -72,3 +72,4 @@ export { useMlflowModels } from './use-mlflow-models'
 export { useOpenmldb } from './use-openmldb'
 export { useDvc } from './use-dvc'
 export { useDinky } from './use-dinky'
+export { useSagemaker } from './use-sagemaker'
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sagemaker.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sagemaker.ts
new file mode 100644
index 0000000000..83e52b874e
--- /dev/null
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sagemaker.ts
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import type { IJsonItem } from '../types'
+import { useCustomParams } from '.'
+
+export function useSagemaker(model: { [field: string]: any }): IJsonItem[] {
+
+  return [
+    {
+      type: 'editor',
+      field: 'sagemakerRequestJson',
+      name: "SagemakerRequestJson",
+      props: {
+        language: 'json'
+      },
+      validate: {
+        trigger: ['input', 'trigger'],
+        required: true,
+        message: 'requestJson' 
+      }
+    },
+    ...useCustomParams({ model, field: 'localParams', isSimple: false })
+  ]
+}
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 7a3878bc58..1d719a8e37 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -384,6 +384,10 @@ export function formatParams(data: INodeData): {
     taskParams.dvcStoreUrl = data.dvcStoreUrl
   }
 
+  if (data.taskType === 'SAGEMAKER') {
+    taskParams.sagemakerRequestJson = data.sagemakerRequestJson
+  }
+
   if (data.taskType === 'DINKY') {
     taskParams.address = data.address
     taskParams.taskId = data.taskId
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 4240892eff..1d16aeef03 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -40,6 +40,7 @@ import { useMlflow } from './use-mlflow'
 import { useOpenmldb } from './use-openmldb'
 import { useDvc } from './use-dvc'
 import { useDinky } from './use-dinky'
+import { userSagemaker } from './use-sagemaker'
 
 export default {
   SHELL: useShell,
@@ -66,5 +67,6 @@ export default {
   MLFLOW: useMlflow,
   OPENMLDB: useOpenmldb,
   DVC: useDvc,
-  DINKY: useDinky
+  DINKY: useDinky,
+  SAGEMAKER: userSagemaker
 }
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
new file mode 100644
index 0000000000..fc3a3be0a6
--- /dev/null
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function userSagemaker({
+  projectCode,
+  from = 0,
+  readonly,
+  data
+}: {
+  projectCode: number
+  from?: number
+  readonly?: boolean
+  data?: ITaskData
+}) {
+  const model = reactive({
+    name: '',
+    taskType: 'MLFLOW',
+    flag: 'YES',
+    description: '',
+    timeoutFlag: false,
+    localParams: [],
+    environmentCode: null,
+    failRetryInterval: 1,
+    failRetryTimes: 0,
+    workerGroup: 'default',
+    delayTime: 0,
+    timeout: 30,
+    timeoutNotifyStrategy: ['WARN'],
+  } as INodeData)
+
+  let extra: IJsonItem[] = []
+  if (from === 1) {
+    extra = [
+      Fields.useTaskType(model, readonly),
+      Fields.useProcessName({
+        model,
+        projectCode,
+        isCreate: !data?.id,
+        from,
+        processName: data?.processName
+      })
+    ]
+  }
+
+  return {
+    json: [
+      Fields.useName(from),
+      ...extra,
+      Fields.useRunFlag(),
+      Fields.useDescription(),
+      Fields.useTaskPriority(),
+      Fields.useWorkerGroup(),
+      Fields.useEnvironmentName(model, !model.id),
+      ...Fields.useTaskGroup(model, projectCode),
+      ...Fields.useFailed(),
+      Fields.useDelayTime(model),
+      ...Fields.useTimeoutAlarm(model),
+      ...Fields.useSagemaker(model),
+      Fields.usePreTasks()
+    ] as IJsonItem[],
+    model
+  }
+}
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 268d8635b8..9878258338 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -359,6 +359,7 @@ interface ITaskParams {
   address?: string
   taskId?: string
   online?: boolean
+  sagemakerRequestJson?: string
 }
 
 interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts 
b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 1cc0e00c5b..b441c6229c 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -40,6 +40,7 @@ export type TaskType =
   | 'OPENMLDB'
   | 'DVC'
   | 'DINKY'
+  | 'SAGEMAKER'
 
 export const TASK_TYPES_MAP = {
   SHELL: {
@@ -128,5 +129,9 @@ export const TASK_TYPES_MAP = {
   DINKY: {
     alias: 'DINKY',
     helperLinkDisable: true
+  },
+  SAGEMAKER: {
+    alias: 'SageMaker',
+    helperLinkDisable: true
   }
 } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
diff --git 
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
 
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 90615483fd..c22dec8480 100644
--- 
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ 
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -176,6 +176,9 @@ $bgLight: #ffffff;
     &.icon-dinky {
       background-image: url('/images/task-icons/dinky.png');
     }
+    &.icon-sagemaker {
+      background-image: url('/images/task-icons/sagemaker.png');
+    }
   }
 
   &:hover {
@@ -255,6 +258,9 @@ $bgLight: #ffffff;
       &.icon-dinky {
         background-image: url('/images/task-icons/dinky_hover.png');
       }
+      &.icon-sagemaker {
+        background-image: url('/images/task-icons/sagemaker_hover.png');
+      }
     }
   }
 }
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 562e55b20b..49c6d44c7f 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -255,6 +255,7 @@ aws-java-sdk-s3-1.12.160.jar
 aws-java-sdk-kms-1.12.160.jar
 aws-java-sdk-emr-1.12.160.jar
 aws-java-sdk-core-1.12.160.jar
+aws-java-sdk-sagemaker-1.12.160.jar
 commons-text-1.8.jar
 httpasyncclient-4.1.4.jar
 httpcore-nio-4.4.14.jar

Reply via email to