This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 23c5959e9 [INLONG-4528][Manager] Support Greenplum sink (#4531)
23c5959e9 is described below
commit 23c5959e9bd4e8c7cbe8499c3840b25f254b933e
Author: jiancheng Lv <[email protected]>
AuthorDate: Tue Jun 7 23:22:50 2022 +0800
[INLONG-4528][Manager] Support Greenplum sink (#4531)
---
.../inlong/manager/common/enums/SinkType.java | 2 +
.../common/pojo/sink/greenplum/GreenplumSink.java | 65 ++++++
.../pojo/sink/greenplum/GreenplumSinkDTO.java | 88 ++++++++
.../sink/greenplum/GreenplumSinkListResponse.java | 57 +++++
.../pojo/sink/greenplum/GreenplumSinkRequest.java | 53 +++++
.../sink/greenplum/GreenplumSinkOperation.java | 229 +++++++++++++++++++++
.../manager/service/sort/util/LoadNodeUtils.java | 34 +++
.../core/sink/GreenplumStreamSinkServiceTest.java | 112 ++++++++++
8 files changed, 640 insertions(+)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index 49e15a801..a798abf11 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -30,6 +30,7 @@ public enum SinkType {
ELASTICSEARCH,
SQLSERVER,
HDFS,
+ GREENPLUM,
;
@@ -42,6 +43,7 @@ public enum SinkType {
public static final String SINK_ELASTICSEARCH = "ELASTICSEARCH";
public static final String SINK_SQLSERVER = "SQLSERVER";
public static final String SINK_HDFS = "HDFS";
+ public static final String SINK_GREENPLUM = "GREENPLUM";
/**
* Get the SinkType enum via the given sinkType string
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSink.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSink.java
new file mode 100644
index 000000000..e62ef40f3
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSink.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.greenplum;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Greenplum sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Greenplum sink info")
+@JsonTypeDefine(value = SinkType.SINK_GREENPLUM)
+public class GreenplumSink extends StreamSink {
+
+ @ApiModelProperty("Greenplum JDBC URL such
as:jdbc:postgresql://host:port/database")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ public GreenplumSink() {
+ this.setSinkType(SinkType.SINK_GREENPLUM);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, GreenplumSinkRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
new file mode 100644
index 000000000..9d2421531
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.greenplum;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+import java.util.Map;
+
+/**
+ * Greenplum sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class GreenplumSinkDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("Greenplum JDBC URL such
as:jdbc:postgresql://host:port/database")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Properties for greenplum")
+ private Map<String, Object> properties;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static GreenplumSinkDTO getFromRequest(GreenplumSinkRequest
request) {
+ return GreenplumSinkDTO.builder()
+ .jdbcUrl(request.getJdbcUrl())
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .primaryKey(request.getPrimaryKey())
+ .tableName(request.getTableName())
+ .properties(request.getProperties())
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the json
+ */
+ public static GreenplumSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ return OBJECT_MAPPER.readValue(extParams, GreenplumSinkDTO.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkListResponse.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkListResponse.java
new file mode 100644
index 000000000..2db425f48
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkListResponse.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.greenplum;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Response of Greenplum sink list
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(SinkType.SINK_GREENPLUM)
+@ApiModel("Response of Greenplum sink paging list")
+public class GreenplumSinkListResponse extends SinkListResponse {
+
+ @ApiModelProperty("Greenplum JDBC URL such
as:jdbc:postgresql://host:port/database")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkRequest.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkRequest.java
new file mode 100644
index 000000000..827ef4ff2
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.greenplum;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the Greenplum sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Greenplum sink info")
+@JsonTypeDefine(value = SinkType.SINK_GREENPLUM)
+public class GreenplumSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("Greenplum JDBC URL such
as:jdbc:postgresql://host:port/database")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperation.java
new file mode 100644
index 000000000..03f7b4712
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperation.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.greenplum;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSink;
+import org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSinkDTO;
+import
org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSinkListResponse;
+import
org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSinkRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Greenplum sink operation
+ */
+@Service
+public class GreenplumSinkOperation implements StreamSinkOperation {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GreenplumSinkOperation.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private StreamSinkEntityMapper sinkMapper;
+ @Autowired
+ private StreamSinkFieldEntityMapper sinkFieldMapper;
+
+ @Override
+ public Boolean accept(SinkType sinkType) {
+ return SinkType.GREENPLUM.equals(sinkType);
+ }
+
+ @Override
+ public Integer saveOpt(SinkRequest request, String operator) {
+ String sinkType = request.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_GREENPLUM.equals(sinkType),
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
sinkType);
+
+ GreenplumSinkRequest greenplumSinkRequest = (GreenplumSinkRequest)
request;
+ StreamSinkEntity entity =
CommonBeanUtils.copyProperties(greenplumSinkRequest, StreamSinkEntity::new);
+ entity.setStatus(SinkStatus.NEW.getCode());
+ entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
+
+ // get the ext params
+ GreenplumSinkDTO dto =
GreenplumSinkDTO.getFromRequest(greenplumSinkRequest);
+ try {
+ entity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
+ }
+ sinkMapper.insert(entity);
+ Integer sinkId = entity.getId();
+ request.setId(sinkId);
+ this.saveFieldOpt(request);
+ return sinkId;
+ }
+
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
+ List<SinkField> fieldList = request.getFieldList();
+ LOGGER.info("begin to save field={}", fieldList);
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return;
+ }
+
+ int size = fieldList.size();
+ List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkType = request.getSinkType();
+ Integer sinkId = request.getId();
+ for (SinkField fieldInfo : fieldList) {
+ StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSinkType(sinkType);
+ fieldEntity.setSinkId(sinkId);
+ fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entityList.add(fieldEntity);
+ }
+
+ sinkFieldMapper.insertAll(entityList);
+ LOGGER.info("success to save field");
+ }
+
+ @Override
+ public StreamSink getByEntity(@NotNull StreamSinkEntity entity) {
+ Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+ String existType = entity.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_GREENPLUM.equals(existType),
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_GREENPLUM, existType));
+ StreamSink response = this.getFromEntity(entity, GreenplumSink::new);
+ List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(entity.getId());
+ List<SinkField> infos = CommonBeanUtils.copyListProperties(entities,
SinkField::new);
+ response.setFieldList(infos);
+ return response;
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_GREENPLUM.equals(existType),
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_GREENPLUM, existType));
+
+ GreenplumSinkDTO dto =
GreenplumSinkDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+
+ return result;
+ }
+
+ @Override
+ public PageInfo<? extends SinkListResponse>
getPageInfo(Page<StreamSinkEntity> entityPage) {
+ if (CollectionUtils.isEmpty(entityPage)) {
+ return new PageInfo<>();
+ }
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity,
GreenplumSinkListResponse::new));
+ }
+
+ @Override
+ public void updateOpt(SinkRequest request, String operator) {
+ String sinkType = request.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_GREENPLUM.equals(sinkType),
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_GREENPLUM, sinkType));
+
+ StreamSinkEntity entity =
sinkMapper.selectByPrimaryKey(request.getId());
+ Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+ GreenplumSinkRequest greenplumSinkRequest = (GreenplumSinkRequest)
request;
+ CommonBeanUtils.copyProperties(greenplumSinkRequest, entity, true);
+ try {
+ GreenplumSinkDTO dto =
GreenplumSinkDTO.getFromRequest(greenplumSinkRequest);
+ entity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ }
+
+ entity.setPreviousStatus(entity.getStatus());
+ entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+ entity.setModifier(operator);
+ entity.setModifyTime(new Date());
+ sinkMapper.updateByPrimaryKeySelective(entity);
+
+ boolean onlyAdd =
SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
+ this.updateFieldOpt(onlyAdd, greenplumSinkRequest);
+
+ LOGGER.info("success to update sink of type={}", sinkType);
+ }
+
+ @Override
+ public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
+ Integer sinkId = request.getId();
+ List<SinkField> fieldRequestList = request.getFieldList();
+ if (CollectionUtils.isEmpty(fieldRequestList)) {
+ return;
+ }
+ if (onlyAdd) {
+ List<StreamSinkFieldEntity> existsFieldList =
sinkFieldMapper.selectBySinkId(sinkId);
+ if (existsFieldList.size() > fieldRequestList.size()) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+ }
+ for (int i = 0; i < existsFieldList.size(); i++) {
+ if
(!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName()))
{
+ throw new
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+ }
+ }
+ }
+ // First physically delete the existing fields
+ sinkFieldMapper.deleteAll(sinkId);
+ // Then batch save the sink fields
+ this.saveFieldOpt(request);
+ LOGGER.info("success to update field");
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 6ce8cab49..19bfb3b76 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
import org.apache.inlong.manager.common.pojo.sink.es.ElasticsearchSink;
+import org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSink;
import org.apache.inlong.manager.common.pojo.sink.hbase.HBaseSink;
import org.apache.inlong.manager.common.pojo.sink.hdfs.HdfsSink;
import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
@@ -48,6 +49,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
@@ -100,6 +102,8 @@ public class LoadNodeUtils {
return createLoadNode((ElasticsearchSink) streamSink);
case HDFS:
return createLoadNode((HdfsSink) streamSink);
+ case GREENPLUM:
+ return createLoadNode((GreenplumSink) streamSink);
default:
throw new BusinessException(String.format("Unsupported
sinkType=%s to create load node", sinkType));
}
@@ -422,6 +426,36 @@ public class LoadNodeUtils {
);
}
+ /**
+ * Create greenplum load node
+ */
+ public static GreenplumLoadNode createLoadNode(GreenplumSink
greenplumSink) {
+ String id = greenplumSink.getSinkName();
+ String name = greenplumSink.getSinkName();
+ List<SinkField> fieldList = greenplumSink.getFieldList();
+ List<FieldInfo> fields = fieldList.stream()
+ .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField,
name))
+ .collect(Collectors.toList());
+ List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
+ Map<String, String> properties =
greenplumSink.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
+
+ return new GreenplumLoadNode(
+ id,
+ name,
+ fields,
+ fieldRelations,
+ null,
+ null,
+ 1,
+ properties,
+ greenplumSink.getJdbcUrl(),
+ greenplumSink.getUsername(),
+ greenplumSink.getPassword(),
+ greenplumSink.getTableName(),
+ greenplumSink.getPrimaryKey());
+ }
+
/**
* Parse information field of data sink.
*/
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
new file mode 100644
index 000000000..ec61cca0f
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.sink;
+
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSink;
+import
org.apache.inlong.manager.common.pojo.sink.greenplum.GreenplumSinkRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Stream sink service test
+ */
+public class GreenplumStreamSinkServiceTest extends ServiceBaseTest {
+
+ private static final String globalGroupId = "b_group1";
+ private static final String globalStreamId = "stream1";
+ private static final String globalOperator = "admin";
+ private static final String fieldName = "greenplum_field";
+ private static final String fieldType = "greenplum_type";
+ private static final Integer fieldId = 1;
+
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+
+ /**
+ * Save sink info.
+ */
+ public Integer saveSink(String sinkName) {
+ streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
globalOperator);
+ GreenplumSinkRequest sinkInfo = new GreenplumSinkRequest();
+ sinkInfo.setInlongGroupId(globalGroupId);
+ sinkInfo.setInlongStreamId(globalStreamId);
+ sinkInfo.setSinkType(SinkType.SINK_GREENPLUM);
+
+ sinkInfo.setJdbcUrl("jdbc:postgresql://localhost:5432/greenplum");
+ sinkInfo.setUsername("greenplum");
+ sinkInfo.setPassword("inlong");
+ sinkInfo.setTableName("user");
+ sinkInfo.setPrimaryKey("name,age");
+
+ sinkInfo.setSinkName(sinkName);
+
sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ SinkField sinkField = new SinkField();
+ sinkField.setFieldName(fieldName);
+ sinkField.setFieldType(fieldType);
+ sinkField.setId(fieldId);
+ List<SinkField> sinkFieldList = new ArrayList<>();
+ sinkFieldList.add(sinkField);
+ sinkInfo.setFieldList(sinkFieldList);
+ return sinkService.save(sinkInfo, globalOperator);
+ }
+
+ /**
+ * Delete greenplum sink info by sink id.
+ */
+ public void deleteGreenplumSink(Integer greenplumSinkId) {
+ boolean result = sinkService.delete(greenplumSinkId, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer greenplumSinkId = this.saveSink("greenplum_default1");
+ StreamSink sink = sinkService.get(greenplumSinkId);
+ Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
+ deleteGreenplumSink(greenplumSinkId);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer greenplumSinkId = this.saveSink("greenplum_default2");
+ StreamSink response = sinkService.get(greenplumSinkId);
+ Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+ GreenplumSink greenplumSink = (GreenplumSink) response;
+
greenplumSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ SinkRequest request = greenplumSink.genSinkRequest();
+ boolean result = sinkService.update(request, globalOperator);
+ Assert.assertTrue(result);
+ deleteGreenplumSink(greenplumSinkId);
+ }
+
+}