This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59618a4cf [INLONG-4376][Manager] Support SqlServer sink (#4380)
59618a4cf is described below

commit 59618a4cfee258b3275fbb506805fc0a21d7a232
Author: jiancheng Lv <[email protected]>
AuthorDate: Thu Jun 2 09:03:25 2022 +0800

    [INLONG-4376][Manager] Support SqlServer sink (#4380)
---
 .../inlong/manager/common/enums/SinkType.java      |   4 +-
 .../common/pojo/sink/sqlserver/SqlServerSink.java  |  73 +++++++
 .../pojo/sink/sqlserver/SqlServerSinkDTO.java      |  95 +++++++++
 .../sink/sqlserver/SqlServerSinkListResponse.java  |  67 ++++++
 .../pojo/sink/sqlserver/SqlServerSinkRequest.java  |  63 ++++++
 .../sink/sqlserver/SqlServerSinkOperation.java     | 229 +++++++++++++++++++++
 .../manager/service/sort/util/LoadNodeUtils.java   |  46 ++++-
 .../core/sink/SqlServerStreamSinkServiceTest.java  | 102 +++++++++
 8 files changed, 676 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index 322dd69c6..f415ba300 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -21,7 +21,7 @@ import java.util.Locale;
 
 public enum SinkType {
 
-    HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE, POSTGRES, ELASTICSEARCH;
+    HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE, POSTGRES, ELASTICSEARCH, 
SQLSERVER;
 
     public static final String SINK_HIVE = "HIVE";
     public static final String SINK_KAFKA = "KAFKA";
@@ -30,7 +30,7 @@ public enum SinkType {
     public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
     public static final String SINK_POSTGRES = "POSTGRES";
     public static final String SINK_ELASTICSEARCH = "ELASTICSEARCH";
-
+    public static final String SINK_SQLSERVER = "SQLSERVER";
     /**
      * Get the SinkType enum via the given sinkType string
      */
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSink.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSink.java
new file mode 100644
index 000000000..448536572
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.sqlserver;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * SqlServer sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "SqlServer sink info")
+@JsonTypeDefine(value = SinkType.SINK_SQLSERVER)
+public class SqlServerSink extends StreamSink {
+
+    @ApiModelProperty("Username of the Sqlserver")
+    private String username;
+
+    @ApiModelProperty("Password of the Sqlserver")
+    private String password;
+
+    @ApiModelProperty("sqlserver meta db URL, etc 
jdbc:sqlserver://host:port;databaseName=database")
+    private String jdbcUrl;
+
+    @ApiModelProperty("schemaName of the Sqlserver")
+    private String schemaName;
+
+    @ApiModelProperty("tableName of the Sqlserver")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+    public SqlServerSink() {
+        this.setSinkType(SinkType.SINK_SQLSERVER);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, SqlServerSinkRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkDTO.java
new file mode 100644
index 000000000..ae42a6420
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkDTO.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.sqlserver;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sqlserver source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SqlServerSinkDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("Username of the Sqlserver")
+    private String username;
+
+    @ApiModelProperty("Password of the Sqlserver")
+    private String password;
+
+    @ApiModelProperty("sqlserver meta db URL, etc 
jdbc:sqlserver://host:port;databaseName=database")
+    private String jdbcUrl;
+
+    @ApiModelProperty("schemaName of the Sqlserver")
+    private String schemaName;
+
+    @ApiModelProperty("tableName of the Sqlserver")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static SqlServerSinkDTO getFromRequest(SqlServerSinkRequest 
request) {
+        return SqlServerSinkDTO.builder()
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .jdbcUrl(request.getJdbcUrl())
+                .schemaName(request.getSchemaName())
+                .tableName(request.getTableName())
+                .serverTimezone(request.getServerTimezone())
+                .allMigration(request.isAllMigration())
+                .primaryKey(request.getPrimaryKey())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from json
+     */
+    public static SqlServerSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+            return OBJECT_MAPPER.readValue(extParams, SqlServerSinkDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkListResponse.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkListResponse.java
new file mode 100644
index 000000000..24c789f38
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkListResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.sqlserver;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Response info of sqlserver source list
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(SinkType.SINK_SQLSERVER)
+@ApiModel("Response of sqlserver sink paging list")
+public class SqlServerSinkListResponse extends SinkListResponse {
+
+    @ApiModelProperty("Username of the Sqlserver")
+    private String username;
+
+    @ApiModelProperty("Password of the Sqlserver")
+    private String password;
+
+    @ApiModelProperty("sqlserver meta db URL, etc 
jdbc:sqlserver://host:port;databaseName=database")
+    private String jdbcUrl;
+
+    @ApiModelProperty("schemaName of the Sqlserver")
+    private String schemaName;
+
+    @ApiModelProperty("tableName of the Sqlserver")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkRequest.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkRequest.java
new file mode 100644
index 000000000..115b4de69
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SqlServerSinkRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.sqlserver;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the sqlserver sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the sqlserver sink info")
+@JsonTypeDefine(value = SinkType.SINK_SQLSERVER)
+public class SqlServerSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Username of the Sqlserver")
+    private String username;
+
+    @ApiModelProperty("Password of the Sqlserver")
+    private String password;
+
+    @ApiModelProperty("sqlserver meta db URL, etc 
jdbc:sqlserver://host:port;databaseName=database")
+    private String jdbcUrl;
+
+    @ApiModelProperty("schemaName of the Sqlserver")
+    private String schemaName;
+
+    @ApiModelProperty("tableName of the Sqlserver")
+    private String tableName;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables")
+    private String primaryKey;
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SqlServerSinkOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SqlServerSinkOperation.java
new file mode 100644
index 000000000..cd6cb1b82
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SqlServerSinkOperation.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.sqlserver;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
+import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSinkDTO;
+import 
org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSinkListResponse;
+import 
org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSinkRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * SqlServer sink operation
+ */
+@Service
+public class SqlServerSinkOperation implements StreamSinkOperation {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlServerSinkOperation.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private StreamSinkEntityMapper sinkMapper;
+    @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.SQLSERVER.equals(sinkType);
+    }
+
+    @Override
+    public Integer saveOpt(SinkRequest request, String operator) {
+        String sinkType = request.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_SQLSERVER.equals(sinkType),
+                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
sinkType);
+
+        SqlServerSinkRequest sqlServerSinkRequest = (SqlServerSinkRequest) 
request;
+        StreamSinkEntity entity = 
CommonBeanUtils.copyProperties(sqlServerSinkRequest, StreamSinkEntity::new);
+        entity.setStatus(SinkStatus.NEW.getCode());
+        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
+
+        // get the ext params
+        SqlServerSinkDTO dto = 
SqlServerSinkDTO.getFromRequest(sqlServerSinkRequest);
+        try {
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
+        }
+        sinkMapper.insert(entity);
+        Integer sinkId = entity.getId();
+        request.setId(sinkId);
+        this.saveFieldOpt(request);
+        return sinkId;
+    }
+
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getFieldList();
+        LOGGER.info("begin to save field={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save field");
+    }
+
+    @Override
+    public StreamSink getByEntity(@NotNull StreamSinkEntity entity) {
+        Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_SQLSERVER.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_SQLSERVER, existType));
+        StreamSink response = this.getFromEntity(entity, SqlServerSink::new);
+        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(entity.getId());
+        List<SinkField> infos = CommonBeanUtils.copyListProperties(entities, 
SinkField::new);
+        response.setFieldList(infos);
+        return response;
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_SQLSERVER.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_SQLSERVER, existType));
+
+        SqlServerSinkDTO dto = 
SqlServerSinkDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+
+        return result;
+    }
+
+    @Override
+    public PageInfo<? extends SinkListResponse> 
getPageInfo(Page<StreamSinkEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, 
SqlServerSinkListResponse::new));
+    }
+
+    @Override
+    public void updateOpt(SinkRequest request, String operator) {
+        String sinkType = request.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_SQLSERVER.equals(sinkType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_SQLSERVER, sinkType));
+
+        StreamSinkEntity entity = 
sinkMapper.selectByPrimaryKey(request.getId());
+        Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        SqlServerSinkRequest sqlServerSinkRequest = (SqlServerSinkRequest) 
request;
+        CommonBeanUtils.copyProperties(sqlServerSinkRequest, entity, true);
+        try {
+            SqlServerSinkDTO dto = 
SqlServerSinkDTO.getFromRequest(sqlServerSinkRequest);
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+        }
+
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+        entity.setModifier(operator);
+        entity.setModifyTime(new Date());
+        sinkMapper.updateByPrimaryKeySelective(entity);
+
+        boolean onlyAdd = 
SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
+        this.updateFieldOpt(onlyAdd, sqlServerSinkRequest);
+
+        LOGGER.info("success to update sink of type={}", sinkType);
+    }
+
+    @Override
+    public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
+        Integer sinkId = request.getId();
+        List<SinkField> fieldRequestList = request.getFieldList();
+        if (CollectionUtils.isEmpty(fieldRequestList)) {
+            return;
+        }
+        if (onlyAdd) {
+            List<StreamSinkFieldEntity> existsFieldList = 
sinkFieldMapper.selectBySinkId(sinkId);
+            if (existsFieldList.size() > fieldRequestList.size()) {
+                throw new 
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+            }
+            for (int i = 0; i < existsFieldList.size(); i++) {
+                if 
(!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName()))
 {
+                    throw new 
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+                }
+            }
+        }
+        // First physically delete the existing fields
+        sinkFieldMapper.deleteAll(sinkId);
+        // Then batch save the sink fields
+        this.saveFieldOpt(request);
+        LOGGER.info("success to update field");
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 1d3ee4e2e..fc81e9721 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSink;
+import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
@@ -47,6 +48,7 @@ import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.HashMap;
@@ -87,6 +89,8 @@ public class LoadNodeUtils {
                 return createLoadNode((ClickHouseSink) streamSink);
             case ICEBERG:
                 return createLoadNode((IcebergSink) streamSink);
+            case SQLSERVER:
+                return createLoadNode((SqlServerSink) streamSink);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sinkType=%s to create 
loadNode", sinkType));
@@ -262,7 +266,7 @@ public class LoadNodeUtils {
     }
 
     /**
-     * create iceberg load node
+     * Create iceberg load node
      */
     public static IcebergLoadNode createLoadNode(IcebergSink icebergSink) {
         String id = icebergSink.getSinkName();
@@ -283,6 +287,46 @@ public class LoadNodeUtils {
         return new IcebergLoadNode(id, name, fields, fieldRelationShips, null, 
null, 1, properties,
                 dbName, tableName, null, null, uri, warehouse);
     }
+  
+    /**
+     * Create SqlServer load node based on SqlServerSink
+     *
+     * @param sqlServerSink SqlServer sink info
+     * @return SqlServer load node info
+     */
+    public static SqlServerLoadNode createLoadNode(SqlServerSink 
sqlServerSink) {
+        final String id = sqlServerSink.getSinkName();
+        final String name = sqlServerSink.getSinkName();
+        final String primaryKey = sqlServerSink.getPrimaryKey();
+        final String jdbcUrl = sqlServerSink.getJdbcUrl();
+        final String userName = sqlServerSink.getUsername();
+        final String password = sqlServerSink.getPassword();
+        final String schemaName = sqlServerSink.getSchemaName();
+        final String tablename = sqlServerSink.getTableName();
+        final List<SinkField> fieldList = sqlServerSink.getFieldList();
+        List<FieldInfo> fields = fieldList.stream()
+                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, 
name))
+                .collect(Collectors.toList());
+        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
+        Map<String, String> properties = 
sqlServerSink.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
+        return new SqlServerLoadNode(
+                id,
+                name,
+                fields,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                jdbcUrl,
+                userName,
+                password,
+                schemaName,
+                tablename,
+                primaryKey
+                );
+    }
 
     /**
      * Parse information field of data sink.
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
new file mode 100644
index 000000000..ac1955c05
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.sink;
+
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
+import 
org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSinkRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Stream sink service test
+ */
+public class SqlServerStreamSinkServiceTest extends ServiceBaseTest {
+
+    private static final String globalGroupId = "b_group1_sqlserver";
+    private static final String globalStreamId = "stream1_sqlserver";
+    private static final String globalOperator = "admin";
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save sink info.
+     */
+    public Integer saveSink(String sinkName) {
+        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
+                globalOperator);
+        SqlServerSinkRequest sinkInfo = new SqlServerSinkRequest();
+        sinkInfo.setInlongGroupId(globalGroupId);
+        sinkInfo.setInlongStreamId(globalStreamId);
+        sinkInfo.setSinkType(SinkType.SINK_SQLSERVER);
+
+        sinkInfo.setJdbcUrl("jdbc:sqlserver://localhost:5432/sqlserver");
+        sinkInfo.setUsername("sqlserver");
+        sinkInfo.setPassword("inlong");
+        sinkInfo.setTableName("user");
+        sinkInfo.setSchemaName("test");
+        sinkInfo.setPrimaryKey("name,age");
+
+        sinkInfo.setSinkName(sinkName);
+        
sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        return sinkService.save(sinkInfo, globalOperator);
+    }
+
+    /**
+     * Delete sqlserver sink info by sink id.
+     */
+    public void deleteSqlServerSink(Integer sqlserverSinkId) {
+        boolean result = sinkService.delete(sqlserverSinkId, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer sqlserverSinkId = this.saveSink("sqlserver_default1");
+        StreamSink sink = sinkService.get(sqlserverSinkId);
+        Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
+        deleteSqlServerSink(sqlserverSinkId);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer sqlserverSinkId = this.saveSink("sqlserver_default2");
+        StreamSink response = sinkService.get(sqlserverSinkId);
+        Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+        SqlServerSink sqlServerSink = (SqlServerSink) response;
+        
sqlServerSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+
+        SqlServerSinkRequest request = 
CommonBeanUtils.copyProperties(sqlServerSink,
+                SqlServerSinkRequest::new);
+        boolean result = sinkService.update(request, globalOperator);
+        Assert.assertTrue(result);
+        deleteSqlServerSink(sqlserverSinkId);
+    }
+
+}

Reply via email to