This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5a3827eef4 [DSIP-19] Support sagemaker connections in the connection
center, as well as external connections to the connection center in sagemaker
tasks (#14976)
5a3827eef4 is described below
commit 5a3827eef49d85d6958d9b7c688706cbcf0a3260
Author: chenrj <[email protected]>
AuthorDate: Tue Oct 31 10:07:05 2023 +0800
[DSIP-19] Support sagemaker connections in the connection center, as well
as external connections to the connection center in sagemaker tasks (#14976)
* Refactoring `Sagemaker` task plugin with connections managed in
connection center.
---------
Co-authored-by: Eric Gao <[email protected]>
---
dolphinscheduler-bom/pom.xml | 1 +
.../dolphinscheduler-datasource-all/pom.xml | 5 +
.../dolphinscheduler-datasource-sagemaker}/pom.xml | 18 ++-
.../sagemaker/SagemakerClientWrapper.java | 65 ++++++++++
.../sagemaker/SagemakerDataSourceChannel.java | 37 ++++++
.../SagemakerDataSourceChannelFactory.java | 29 ++---
.../sagemaker/param/SagemakerConnectionParam.java | 28 ++---
.../param/SagemakerDataSourceParamDTO.java | 27 ++--
.../param/SagemakerDataSourceProcessor.java | 136 +++++++++++++++++++++
.../SagemakerDataSourceProcessorTest.java | 115 +++++++++++++++++
dolphinscheduler-datasource-plugin/pom.xml | 1 +
.../apache/dolphinscheduler/spi/enums/DbType.java | 4 +-
.../dolphinscheduler-task-api/pom.xml | 4 +
.../dolphinscheduler-task-sagemaker/pom.xml | 5 +
.../plugin/task/sagemaker/SagemakerParameters.java | 28 +++++
.../plugin/task/sagemaker/SagemakerTask.java | 33 +++--
.../task/sagemaker/SagemakerTaskChannel.java | 2 +-
...ers.java => SagemakerTaskExecutionContext.java} | 32 ++---
.../plugin/task/sagemaker/SagemakerTaskTest.java | 34 +++++-
.../plugin/task/zeppelin/ZeppelinParameters.java | 2 +
.../src/locales/en_US/datasource.ts | 12 +-
.../src/locales/zh_CN/datasource.ts | 10 +-
.../src/service/modules/data-source/types.ts | 2 +
.../src/views/datasource/list/detail.tsx | 9 +-
.../src/views/datasource/list/use-form.ts | 24 +++-
.../task/components/node/fields/use-datasource.ts | 5 +
.../projects/task/components/node/format-data.ts | 5 +
.../task/components/node/tasks/use-sagemaker.ts | 8 +-
.../views/projects/task/components/node/types.ts | 2 +
29 files changed, 583 insertions(+), 100 deletions(-)
diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index ccd6b3a4f7..77b9d0c0c8 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -120,6 +120,7 @@
<zeppelin-client.version>0.10.1</zeppelin-client.version>
<testcontainer.version>1.17.6</testcontainer.version>
<checker-qual.version>3.19.0</checker-qual.version>
+ <zeppelin-client.version>0.10.1</zeppelin-client.version>
</properties>
<dependencyManagement>
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
index 71905dea45..cb9c78f9d3 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
@@ -143,5 +143,10 @@
<artifactId>dolphinscheduler-datasource-doris</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-sagemaker</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml
similarity index 80%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
copy to
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml
index dfb769a46c..870abed97a 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml
@@ -20,12 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-task-plugin</artifactId>
+ <artifactId>dolphinscheduler-datasource-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
- <artifactId>dolphinscheduler-task-sagemaker</artifactId>
+ <artifactId>dolphinscheduler-datasource-sagemaker</artifactId>
<packaging>jar</packaging>
+ <name>${project.artifactId}</name>
<dependencies>
<dependency>
@@ -33,20 +34,17 @@
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-task-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-common</artifactId>
- <scope>provided</scope>
+ <artifactId>dolphinscheduler-datasource-api</artifactId>
+ <version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sagemaker</artifactId>
</dependency>
-
</dependencies>
+
</project>
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java
new file mode 100644
index 0000000000..2996789221
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
+import com.amazonaws.services.sagemaker.model.ListNotebookInstancesRequest;
+
+@Slf4j
+public class SagemakerClientWrapper implements AutoCloseable {
+
+ private AmazonSageMaker amazonSageMaker;
+
+ public SagemakerClientWrapper(String accessKey, String secretAccessKey,
String region) {
+ checkNotNull(accessKey, "sagemaker accessKey cannot be null");
+ checkNotNull(secretAccessKey, "sagemaker secretAccessKey cannot be
null");
+ checkNotNull(region, "sagemaker region cannot be null");
+
+ final BasicAWSCredentials basicAWSCredentials = new
BasicAWSCredentials(accessKey, secretAccessKey);
+ final AWSCredentialsProvider awsCredentialsProvider = new
AWSStaticCredentialsProvider(basicAWSCredentials);
+ // create a SageMaker client
+ amazonSageMaker =
AmazonSageMakerClientBuilder.standard().withCredentials(awsCredentialsProvider)
+ .withRegion(region).build();
+ }
+
+ public boolean checkConnect() {
+ try {
+ // If listing notebook instances fails, an exception will be
thrown directly
+ ListNotebookInstancesRequest request = new
ListNotebookInstancesRequest();
+ amazonSageMaker.listNotebookInstances(request);
+ log.info("sagemaker client connects to server successfully");
+ return true;
+ } catch (Exception e) {
+ log.info("sagemaker client failed to connect to the server");
+ }
+ return false;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java
new file mode 100644
index 0000000000..03b96cf8a3
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
+
+import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+public class SagemakerDataSourceChannel implements DataSourceChannel {
+
+ @Override
+ public AdHocDataSourceClient
createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ throw new UnsupportedOperationException("Sagemaker
AdHocDataSourceClient is not supported");
+ }
+
+ @Override
+ public PooledDataSourceClient
createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ throw new UnsupportedOperationException("Sagemaker
AdHocDataSourceClient is not supported");
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java
similarity index 60%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
copy to
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java
index 3b33eded1a..04ab93f36f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java
@@ -15,29 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.sagemaker;
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
-import org.apache.commons.lang3.StringUtils;
+import com.google.auto.service.AutoService;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+@AutoService(DataSourceChannelFactory.class)
+public class SagemakerDataSourceChannelFactory implements
DataSourceChannelFactory {
-@Getter
-@Setter
-@ToString
-public class SagemakerParameters extends AbstractParameters {
-
- /**
- * request script
- */
- private String sagemakerRequestJson;
+ @Override
+ public DataSourceChannel create() {
+ return new SagemakerDataSourceChannel();
+ }
@Override
- public boolean checkParameters() {
- return StringUtils.isNotEmpty(sagemakerRequestJson);
+ public String getName() {
+ return "sagemaker";
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java
similarity index 59%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
copy to
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java
index 3b33eded1a..19bf72eb56 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java
@@ -15,29 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.sagemaker;
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
-import org.apache.commons.lang3.StringUtils;
+import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import com.fasterxml.jackson.annotation.JsonInclude;
-@Getter
-@Setter
-@ToString
-public class SagemakerParameters extends AbstractParameters {
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class SagemakerConnectionParam implements ConnectionParam {
- /**
- * request script
- */
- private String sagemakerRequestJson;
+ protected String userName;
- @Override
- public boolean checkParameters() {
- return StringUtils.isNotEmpty(sagemakerRequestJson);
- }
+ protected String password;
+ protected String awsRegion;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java
similarity index 61%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
copy to
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java
index 3b33eded1a..902b2b1d3b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java
@@ -15,29 +15,20 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.sagemaker;
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.apache.commons.lang3.StringUtils;
+import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+@Data
+public class SagemakerDataSourceParamDTO extends BaseDataSourceParamDTO {
-@Getter
-@Setter
-@ToString
-public class SagemakerParameters extends AbstractParameters {
-
- /**
- * request script
- */
- private String sagemakerRequestJson;
+ protected String awsRegion;
@Override
- public boolean checkParameters() {
- return StringUtils.isNotEmpty(sagemakerRequestJson);
+ public DbType getType() {
+ return DbType.SAGEMAKER;
}
-
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java
new file mode 100644
index 0000000000..6a534a5f0e
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.sagemaker.SagemakerClientWrapper;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.text.MessageFormat;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceProcessor.class)
+@Slf4j
+public class SagemakerDataSourceProcessor implements DataSourceProcessor {
+
+ @Override
+ public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+ return JSONUtils.parseObject(paramJson,
SagemakerDataSourceParamDTO.class);
+ }
+
+ @Override
+ public void checkDatasourceParam(BaseDataSourceParamDTO
datasourceParamDTO) {
+ SagemakerDataSourceParamDTO sageMakerDataSourceParamDTO =
(SagemakerDataSourceParamDTO) datasourceParamDTO;
+ if (StringUtils.isEmpty(sageMakerDataSourceParamDTO.getUserName())
+ ||
StringUtils.isEmpty(sageMakerDataSourceParamDTO.getPassword())
+ ||
StringUtils.isEmpty(sageMakerDataSourceParamDTO.getAwsRegion())) {
+ throw new IllegalArgumentException("sagemaker datasource param is
not valid");
+ }
+ }
+
+ @Override
+ public String getDatasourceUniqueId(ConnectionParam connectionParam,
DbType dbType) {
+ SagemakerConnectionParam baseConnectionParam =
(SagemakerConnectionParam) connectionParam;
+ return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(),
+
PasswordUtils.encodePassword(baseConnectionParam.getUserName()),
+
PasswordUtils.encodePassword(baseConnectionParam.getPassword()),
+
PasswordUtils.encodePassword(baseConnectionParam.getAwsRegion()));
+ }
+
+ // SageMaker
+ @Override
+ public BaseDataSourceParamDTO createDatasourceParamDTO(String
connectionJson) {
+ SagemakerConnectionParam connectionParams = (SagemakerConnectionParam)
createConnectionParams(connectionJson);
+ SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = new
SagemakerDataSourceParamDTO();
+
+
sagemakerDataSourceParamDTO.setUserName(connectionParams.getUserName());
+
sagemakerDataSourceParamDTO.setPassword(connectionParams.getPassword());
+
sagemakerDataSourceParamDTO.setAwsRegion(connectionParams.getAwsRegion());
+ return sagemakerDataSourceParamDTO;
+ }
+
+ @Override
+ public SagemakerConnectionParam
createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
+ SagemakerDataSourceParamDTO sageMakerDataSourceParam =
(SagemakerDataSourceParamDTO) datasourceParam;
+ SagemakerConnectionParam sageMakerConnectionParam = new
SagemakerConnectionParam();
+
sageMakerConnectionParam.setUserName(sageMakerDataSourceParam.getUserName());
+
sageMakerConnectionParam.setPassword(sageMakerDataSourceParam.getPassword());
+
sageMakerConnectionParam.setAwsRegion(sageMakerDataSourceParam.getAwsRegion());
+
+ return sageMakerConnectionParam;
+ }
+
+ @Override
+ public ConnectionParam createConnectionParams(String connectionJson) {
+ return JSONUtils.parseObject(connectionJson,
SagemakerConnectionParam.class);
+ }
+
+ @Override
+ public String getDatasourceDriver() {
+ return "";
+ }
+
+ @Override
+ public String getValidationQuery() {
+ return "";
+ }
+
+ @Override
+ public String getJdbcUrl(ConnectionParam connectionParam) {
+ return "";
+ }
+
+ @Override
+ public Connection getConnection(ConnectionParam connectionParam) {
+ return null;
+ }
+
+ @Override
+ public boolean checkDataSourceConnectivity(ConnectionParam
connectionParam) {
+ SagemakerConnectionParam baseConnectionParam =
(SagemakerConnectionParam) connectionParam;
+ try (
+ SagemakerClientWrapper sagemakerClientWrapper =
+ new
SagemakerClientWrapper(baseConnectionParam.userName,
+ baseConnectionParam.password,
baseConnectionParam.awsRegion)) {
+ return sagemakerClientWrapper.checkConnect();
+ } catch (Exception e) {
+ log.error("sagemaker client failed to connect to the server", e);
+ return false;
+ }
+ }
+
+ @Override
+ public DbType getDbType() {
+ return DbType.SAGEMAKER;
+ }
+
+ @Override
+ public DataSourceProcessor create() {
+ return new SagemakerDataSourceProcessor();
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
new file mode 100644
index 0000000000..424f478699
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
+
+import
org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
+import
org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerDataSourceParamDTO;
+import
org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerDataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+
+public class SagemakerDataSourceProcessorTest {
+
+ private SagemakerDataSourceProcessor sagemakerDataSourceProcessor;
+
+ private String connectJson =
+ "{\"userName\":\"access key\",\"password\":\"secret access
key\",\"awsRegion\":\"region\"}";
+
+ @BeforeEach
+ public void init() {
+ sagemakerDataSourceProcessor = new SagemakerDataSourceProcessor();
+ }
+
+ @Test
+ void testCheckDatasourceParam() {
+ SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = new
SagemakerDataSourceParamDTO();
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
+ sagemakerDataSourceParamDTO.setUserName("access key");
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
+ sagemakerDataSourceParamDTO.setPassword("secret access key");
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
+ sagemakerDataSourceParamDTO.setAwsRegion("region");
+
+ Assertions
+ .assertDoesNotThrow(
+ () ->
sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
+ }
+
+ @Test
+ void testGetDatasourceUniqueId() {
+ SagemakerConnectionParam sagemakerConnectionParam = new
SagemakerConnectionParam();
+ sagemakerConnectionParam.setUserName("access key");
+ sagemakerConnectionParam.setPassword("secret access key");
+ sagemakerConnectionParam.setAwsRegion("region");
+ Assertions.assertEquals("sagemaker@access key@secret access
key@region",
+
sagemakerDataSourceProcessor.getDatasourceUniqueId(sagemakerConnectionParam,
DbType.SAGEMAKER));
+
+ }
+
+ @Test
+ void testCreateDatasourceParamDTO() {
+ SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO =
+ (SagemakerDataSourceParamDTO)
sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ Assertions.assertEquals("access key",
sagemakerDataSourceParamDTO.getUserName());
+ Assertions.assertEquals("secret access key",
sagemakerDataSourceParamDTO.getPassword());
+ Assertions.assertEquals("region",
sagemakerDataSourceParamDTO.getAwsRegion());
+ }
+
+ @Test
+ void testCreateConnectionParams() {
+ SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO =
+ (SagemakerDataSourceParamDTO)
sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ SagemakerConnectionParam sagemakerConnectionParam =
+
sagemakerDataSourceProcessor.createConnectionParams(sagemakerDataSourceParamDTO);
+ Assertions.assertEquals("access key",
sagemakerConnectionParam.getUserName());
+ Assertions.assertEquals("secret access key",
sagemakerConnectionParam.getPassword());
+ Assertions.assertEquals("region",
sagemakerConnectionParam.getAwsRegion());
+ }
+
+ @Test
+ void testTestConnection() {
+ SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO =
+ (SagemakerDataSourceParamDTO)
sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ SagemakerConnectionParam connectionParam =
+
sagemakerDataSourceProcessor.createConnectionParams(sagemakerDataSourceParamDTO);
+
Assertions.assertFalse(sagemakerDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+
+ try (
+ MockedConstruction<SagemakerClientWrapper>
sshClientWrapperMockedConstruction =
+ Mockito.mockConstruction(SagemakerClientWrapper.class,
(mock, context) -> {
+ Mockito.when(
+ mock.checkConnect())
+ .thenReturn(true);
+ })) {
+
Assertions.assertTrue(sagemakerDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+ }
+
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/pom.xml
b/dolphinscheduler-datasource-plugin/pom.xml
index 91882bb0b6..fb08aa7f6d 100644
--- a/dolphinscheduler-datasource-plugin/pom.xml
+++ b/dolphinscheduler-datasource-plugin/pom.xml
@@ -53,6 +53,7 @@
<module>dolphinscheduler-datasource-snowflake</module>
<module>dolphinscheduler-datasource-vertica</module>
<module>dolphinscheduler-datasource-doris</module>
+ <module>dolphinscheduler-datasource-sagemaker</module>
</modules>
<dependencyManagement>
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 967eec3b86..2dd8958924 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -52,8 +52,8 @@ public enum DbType {
VERTICA(21, "vertica"),
HANA(22, "hana"),
DORIS(23, "doris"),
- ZEPPELIN(24, "zeppelin");
-
+ ZEPPELIN(24, "zeppelin"),
+ SAGEMAKER(25, "sagemaker");
private static final Map<Integer, DbType> DB_TYPE_MAP =
Arrays.stream(DbType.values()).collect(toMap(DbType::getCode,
Functions.identity()));
@EnumValue
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
index 85d8b92f82..b115a1b6d1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
@@ -293,5 +293,9 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
index dfb769a46c..7caaf28641 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
@@ -47,6 +47,11 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sagemaker</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
index 3b33eded1a..4d2c1cace4 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
@@ -17,27 +17,55 @@
package org.apache.dolphinscheduler.plugin.task.sagemaker;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.lang3.StringUtils;
+import java.util.Objects;
+
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
+@Slf4j
public class SagemakerParameters extends AbstractParameters {
/**
* request script
*/
private String sagemakerRequestJson;
+ private String username;
+ private String password;
+ private String awsRegion;
+ private int datasource;
+ private String type;
@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(sagemakerRequestJson);
}
+ public SagemakerTaskExecutionContext
generateExtendedContext(ResourceParametersHelper parametersHelper) {
+ DataSourceParameters dataSourceParameters =
+ (DataSourceParameters)
parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
+ SagemakerTaskExecutionContext sagemakerTaskExecutionContext = new
SagemakerTaskExecutionContext();
+ sagemakerTaskExecutionContext.setConnectionParams(
+ Objects.nonNull(dataSourceParameters) ?
dataSourceParameters.getConnectionParams() : null);
+ return sagemakerTaskExecutionContext;
+ }
+
+ @Override
+ public ResourceParametersHelper getResources() {
+ ResourceParametersHelper resources = super.getResources();
+ resources.put(ResourceType.DATASOURCE, datasource);
+ return resources;
+ }
+
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index 0e8a296151..d04f5a3fac 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -23,13 +23,15 @@ import static
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN
import static
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
@@ -64,14 +66,16 @@ public class SagemakerTask extends AbstractRemoteTask {
*/
private SagemakerParameters parameters;
- private final AmazonSageMaker client;
- private final PipelineUtils utils;
+ private AmazonSageMaker client;
+ private PipelineUtils utils;
private PipelineUtils.PipelineId pipelineId;
+ private SagemakerConnectionParam sagemakerConnectionParam;
+ private SagemakerTaskExecutionContext sagemakerTaskExecutionContext;
+ private TaskExecutionContext taskExecutionContext;
public SagemakerTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- client = createClient();
- utils = new PipelineUtils();
+ this.taskExecutionContext = taskExecutionContext;
}
@Override
@@ -83,15 +87,24 @@ public class SagemakerTask extends AbstractRemoteTask {
public void init() {
parameters = JSONUtils.parseObject(taskRequest.getTaskParams(),
SagemakerParameters.class);
-
- log.info("Initialize Sagemaker task params {}",
JSONUtils.toPrettyJsonString(parameters));
if (parameters == null) {
throw new SagemakerTaskException("Sagemaker task params is empty");
}
if (!parameters.checkParameters()) {
throw new SagemakerTaskException("Sagemaker task params is not
valid");
}
+ sagemakerTaskExecutionContext =
+
parameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ sagemakerConnectionParam =
+ (SagemakerConnectionParam)
DataSourceUtils.buildConnectionParams(DbType.valueOf(parameters.getType()),
+ sagemakerTaskExecutionContext.getConnectionParams());
+ parameters.setUsername(sagemakerConnectionParam.getUserName());
+ parameters.setPassword(sagemakerConnectionParam.getPassword());
+ parameters.setAwsRegion(sagemakerConnectionParam.getAwsRegion());
+ log.info("Initialize Sagemaker task params {}",
JSONUtils.toPrettyJsonString(parameters));
+ client = createClient();
+ utils = new PipelineUtils();
}
@Override
@@ -170,9 +183,9 @@ public class SagemakerTask extends AbstractRemoteTask {
}
protected AmazonSageMaker createClient() {
- final String awsAccessKeyId =
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
- final String awsSecretAccessKey =
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
- final String awsRegion =
PropertyUtils.getString(TaskConstants.AWS_REGION);
+ final String awsAccessKeyId = parameters.getUsername();
+ final String awsSecretAccessKey = parameters.getPassword();
+ final String awsRegion = parameters.getAwsRegion();
final BasicAWSCredentials basicAWSCredentials = new
BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new
AWSStaticCredentialsProvider(basicAWSCredentials);
// create a SageMaker client
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
index 9fd8910831..4dcdb051e9 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
@@ -43,7 +43,7 @@ public class SagemakerTaskChannel implements TaskChannel {
@Override
public ResourceParametersHelper getResources(String parameters) {
- return null;
+ return JSONUtils.parseObject(parameters,
SagemakerParameters.class).getResources();
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java
similarity index 62%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java
index 3b33eded1a..68b3590663 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java
@@ -17,27 +17,29 @@
package org.apache.dolphinscheduler.plugin.task.sagemaker;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import java.io.Serializable;
-import org.apache.commons.lang3.StringUtils;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+/**
+ * master/worker task transport
+ */
-@Getter
-@Setter
-@ToString
-public class SagemakerParameters extends AbstractParameters {
+public class SagemakerTaskExecutionContext implements Serializable {
/**
- * request script
+ * connectionParams
*/
- private String sagemakerRequestJson;
+ private String connectionParams;
- @Override
- public boolean checkParameters() {
- return StringUtils.isNotEmpty(sagemakerRequestJson);
+ public String getConnectionParams() {
+ return connectionParams;
}
+ public void setConnectionParams(String connectionParams) {
+ this.connectionParams = connectionParams;
+ }
+
+ @Override
+ public String toString() {
+ return "SagemakerTaskExecutionContext{" + "connectionParams='" +
connectionParams + '\'' + '}';
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
index 07dd86d567..bbe3136dbf 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
@@ -18,19 +18,26 @@
package org.apache.dolphinscheduler.plugin.task.sagemaker;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -49,14 +56,30 @@ public class SagemakerTaskTest {
private AmazonSageMaker client;
private PipelineUtils pipelineUtils = new PipelineUtils();
+ private static final String MOCK_USERNAME = "lucky";
+ private static final String MOCK_PASSWORD = "root";
+ private static final String MOCK_TYPE = "SAGEMAKER";
+ private static final String MOCK_AWS_REGION = "REGION";
+
+ private static MockedStatic<DataSourceUtils> dataSourceUtilsStaticMock =
null;
+
@BeforeEach
public void before() {
String parameters = buildParameters();
+ System.out.println(parameters);
TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
+ ResourceParametersHelper resourceParametersHelper =
Mockito.mock(ResourceParametersHelper.class);
+ SagemakerConnectionParam sagemakerConnectionParam =
Mockito.mock(SagemakerConnectionParam.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+
Mockito.when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
+
+ dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class);
+ dataSourceUtilsStaticMock.when(() ->
DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
+ .thenReturn(sagemakerConnectionParam);
client = Mockito.mock(AmazonSageMaker.class);
- sagemakerTask = new SagemakerTask(taskExecutionContext);
+ sagemakerTask = spy(new SagemakerTask(taskExecutionContext));
+ doReturn(client).when(sagemakerTask).createClient();
sagemakerTask.init();
StartPipelineExecutionResult startPipelineExecutionResult =
Mockito.mock(StartPipelineExecutionResult.class);
@@ -75,6 +98,11 @@ public class SagemakerTaskTest {
Mockito.lenient().when(client.describePipelineExecution(any())).thenReturn(describePipelineExecutionResult);
}
+ @AfterEach
+ public void afterEach() {
+ dataSourceUtilsStaticMock.close();
+ }
+
@Test
public void testStartPipelineRequest() throws Exception {
StartPipelineExecutionRequest request =
sagemakerTask.createStartPipelineRequest();
@@ -105,6 +133,10 @@ public class SagemakerTaskTest {
throw new RuntimeException(e);
}
parameters.setSagemakerRequestJson(sagemakerRequestJson);
+ parameters.setUsername(MOCK_USERNAME);
+ parameters.setPassword(MOCK_PASSWORD);
+ parameters.setAwsRegion(MOCK_AWS_REGION);
+ parameters.setType(MOCK_TYPE);
return JSONUtils.toJsonString(parameters);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index b3ddcbc791..c073627c91 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -32,10 +32,12 @@ import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
+@Slf4j
public class ZeppelinParameters extends AbstractParameters {
/**
diff --git a/dolphinscheduler-ui/src/locales/en_US/datasource.ts
b/dolphinscheduler-ui/src/locales/en_US/datasource.ts
index 134bf015b1..e9b799b16e 100644
--- a/dolphinscheduler-ui/src/locales/en_US/datasource.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/datasource.ts
@@ -70,6 +70,10 @@ export default {
user_password_tips: 'Please enter your password',
aws_region: 'Aws Region',
aws_region_tips: 'Please enter AwsRegion',
+ aws_access_key: 'AwsAccessKey',
+ aws_access_key_tips: 'Please enter AwsAccessKey',
+ aws_secret_access_key: 'AwsSecretAccessKey',
+ aws_secret_access_key_tips: 'Please enter AwsSecretAccessKey',
validation: 'Validation',
mode_tips: 'Please select a mode',
jdbc_format_tips: 'jdbc connection parameters is not a correct JSON format',
@@ -87,5 +91,11 @@ export default {
SecretAccessKey: 'SecretAccessKey',
SecretAccessKey_tips: 'Please input SecretAccessKey',
dbUser: 'DbUser',
- dbUser_tips: 'Please input DbUser'
+ dbUser_tips: 'Please input DbUser',
+ zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+ zeppelin_rest_endpoint_tips: 'Please input zeppelin restEndpoint',
+ kubeConfig: 'kubeConfig',
+ kubeConfig_tips: 'Please input KubeConfig',
+ namespace: 'namespace',
+ namespace_tips: 'Please input namespace'
}
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
index 434f339dce..7aa797a591 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
@@ -67,6 +67,10 @@ export default {
user_password_tips: '请输入密码',
aws_region: 'AwsRegion',
aws_region_tips: '请输入AwsRegion',
+ aws_access_key: 'AwsAccessKey',
+ aws_access_key_tips: '请输入AwsAccessKey',
+ aws_secret_access_key: 'AwsSecretAccessKey',
+ aws_secret_access_key_tips: '请输入AwsSecretAccessKey',
validation: '验证',
mode_tips: '请选择验证模式',
jdbc_format_tips: 'jdbc连接参数不是一个正确的JSON格式',
@@ -86,5 +90,9 @@ export default {
dbUser: 'DbUser',
dbUser_tips: '请输入DbUser',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
- zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint'
+ zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint',
+ kubeConfig: 'kubeConfig',
+ kubeConfig_tips: '请输入KubeConfig',
+ namespace: 'namespace',
+ namespace_tips: '请输入namespace'
}
diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
index 8e84c7887b..a5fbb1a8a5 100644
--- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
+++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
@@ -40,6 +40,7 @@ type IDataBase =
| 'DORIS'
| 'KYUUBI'
| 'ZEPPELIN'
+ | 'SAGEMAKER'
type IDataBaseLabel =
| 'MYSQL'
@@ -61,6 +62,7 @@ type IDataBaseLabel =
| 'SSH'
| 'KYUUBI'
| 'ZEPPELIN'
+ | 'SAGEMAKER'
interface IDataSource {
id?: number
diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
index 599977accf..3a7c1e6074 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
+++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
@@ -165,6 +165,7 @@ const DetailModal = defineComponent({
showPublicKey,
modeOptions,
redShiftModeOptions,
+ sagemakerModeOption,
loading,
saving,
testing,
@@ -321,6 +322,8 @@ const DetailModal = defineComponent({
options={
detailForm.type === 'REDSHIFT'
? redShiftModeOptions
+ : detailForm.type === 'SAGEMAKER'
+ ? sagemakerModeOption
: modeOptions
}
></NSelect>
@@ -497,7 +500,11 @@ const DetailModal = defineComponent({
/>
</NFormItem>
<NFormItem
- v-show={showMode && detailForm.mode === 'IAM-accessKey'}
+ v-show={
+ showMode &&
+ detailForm.mode === 'IAM-accessKey' &&
+ detailForm.type != 'SAGEMAKER'
+ }
label={t('datasource.dbUser')}
path='dbUser'
show-require-mark
diff --git a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
index 8561af779f..ed383c3720 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
+++ b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
@@ -182,7 +182,8 @@ export function useForm(id?: number) {
if (
!state.detailForm.dbUser &&
state.showMode &&
- state.detailForm.mode === 'IAM-accessKey'
+ state.detailForm.mode === 'IAM-accessKey' &&
+ state.detailForm.type != 'SAGEMAKER'
) {
return new Error(t('datasource.IAM-accessKey'))
}
@@ -228,6 +229,12 @@ export function useForm(id?: number) {
label: 'IAM-accessKey',
value: 'IAM-accessKey'
}
+ ],
+ sagemakerModeOption: [
+ {
+ label: 'IAM-accessKey',
+ value: 'IAM-accessKey'
+ }
]
})
@@ -239,8 +246,8 @@ export function useForm(id?: number) {
state.showHost = type !== 'ATHENA'
state.showPort = type !== 'ATHENA'
- state.showAwsRegion = type === 'ATHENA'
- state.showMode = ['AZURESQL', 'REDSHIFT'].includes(type)
+ state.showAwsRegion = type === 'ATHENA' || type === 'SAGEMAKER'
+ state.showMode = ['AZURESQL', 'REDSHIFT', 'SAGEMAKER'].includes(type)
if (type === 'ORACLE' && !id) {
state.detailForm.connectType = 'ORACLE_SERVICE_NAME'
@@ -254,7 +261,7 @@ export function useForm(id?: number) {
} else {
state.showPrincipal = false
}
- if (type === 'SSH' || type === 'ZEPPELIN') {
+ if (type === 'SSH' || type === 'ZEPPELIN' || type === 'SAGEMAKER') {
state.showDataBaseName = false
state.requiredDataBase = false
state.showJDBCConnectParameters = false
@@ -267,6 +274,10 @@ export function useForm(id?: number) {
state.showPort = false
state.showRestEndpoint = true
}
+ if (type === 'SAGEMAKER') {
+ state.showHost = false
+ state.showPort = false
+ }
} else {
state.showDataBaseName = true
state.requiredDataBase = true
@@ -425,6 +436,11 @@ export const datasourceType: IDataBaseOptionKeys = {
value: 'DORIS',
label: 'DORIS',
defaultPort: 9030
+ },
+ SAGEMAKER: {
+ value: 'SAGEMAKER',
+ label: 'SAGEMAKER',
+ defaultPort: 0
}
}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
index 3b2756b5e7..d685b64de2 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
@@ -142,6 +142,11 @@ export function useDatasource(
id: 23,
code: 'DORIS',
disabled: false
+ },
+ {
+ id: 24,
+ code: 'SAGEMAKER',
+ disabled: false
}
]
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index f9801708e6..a5598cd8df 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -407,6 +407,11 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'SAGEMAKER') {
taskParams.sagemakerRequestJson = data.sagemakerRequestJson
+ taskParams.username = data.username
+ taskParams.password = data.password
+ taskParams.datasource = data.datasource
+ taskParams.type = data.type
+ taskParams.awsRegion = data.awsRegion
}
if (data.taskType === 'PYTORCH') {
taskParams.script = data.script
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
index 3bcb13e052..fa11be5a97 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
@@ -43,7 +43,12 @@ export function userSagemaker({
workerGroup: 'default',
delayTime: 0,
timeout: 30,
- timeoutNotifyStrategy: ['WARN']
+ type: 'SAGEMAKER',
+ displayRows: 10,
+ timeoutNotifyStrategy: ['WARN'],
+ username: '',
+ password: '',
+ awsRegion: ''
} as INodeData)
return {
@@ -60,6 +65,7 @@ export function userSagemaker({
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
+ ...Fields.useDatasource(model),
...Fields.useSagemaker(model),
Fields.usePreTasks()
] as IJsonItem[],
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 9c48fea5e0..e453a3d7f9 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -451,6 +451,8 @@ interface ITaskParams {
filterCondition?: string
listParameters?: Array<any>
yarnQueue?: string
+ awsRegion?: string
+ kubeConfig?: string
}
interface INodeData