This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a97e9e4e4 [INLONG-7265][Manager] Support register and manage the 
resource of Kudu sink (#7270)
a97e9e4e4 is described below

commit a97e9e4e4336910fab8857d17eeda8b7f53cfe39
Author: feat <[email protected]>
AuthorDate: Tue Mar 14 13:13:29 2023 +0800

    [INLONG-7265][Manager] Support register and manage the resource of Kudu 
sink (#7270)
    
    Co-authored-by: healchow <[email protected]>
---
 .../inlong/manager/common/consts/SinkType.java     |   1 +
 .../manager/pojo/sink/kudu/KuduColumnInfo.java     |  75 ++++++++++
 .../inlong/manager/pojo/sink/kudu/KuduSink.java    |  63 ++++++++
 .../inlong/manager/pojo/sink/kudu/KuduSinkDTO.java |  86 +++++++++++
 .../manager/pojo/sink/kudu/KuduSinkRequest.java    |  45 ++++++
 .../manager/pojo/sink/kudu/KuduTableInfo.java      |  37 +++++
 .../inlong/manager/pojo/sink/kudu/KuduType.java    |  78 ++++++++++
 .../manager/pojo/sort/util/LoadNodeUtils.java      |  26 ++++
 .../service/sink/kudu/KuduSinkOperator.java        | 164 +++++++++++++++++++++
 .../sort/protocol/node/load/KuduLoadNode.java      |  27 ++--
 10 files changed, 586 insertions(+), 16 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index c7e293ed6..a74c9c337 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -38,5 +38,6 @@ public class SinkType {
     public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
     public static final String DORIS = "DORIS";
     public static final String STARROCKS = "STARROCKS";
+    public static final String KUDU = "KUDU";
     public static final String REDIS = "REDIS";
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduColumnInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduColumnInfo.java
new file mode 100644
index 000000000..9afeea165
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduColumnInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Kudu column info
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.KUDU)
+public class KuduColumnInfo extends SinkField {
+
+    private Integer length;
+
+    private Integer precision;
+
+    private Integer scale;
+
+    private String partitionStrategy;
+
+    private Integer bucketNum;
+
+    private Integer width;
+
+    /**
+     * Get the extra param from the Json
+     */
+    public static KuduColumnInfo getFromJson(@NotNull String extParams) {
+        if (StringUtils.isEmpty(extParams)) {
+            return new KuduColumnInfo();
+        }
+        try {
+            return JsonUtils.parseObject(extParams, KuduColumnInfo.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static KuduColumnInfo getFromRequest(SinkField sinkField) {
+        return CommonBeanUtils.copyProperties(sinkField, KuduColumnInfo::new, 
true);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
new file mode 100644
index 000000000..871ce2f98
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Kudu sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Kudu sink info")
+@JsonTypeDefine(value = SinkType.KUDU)
+public class KuduSink extends StreamSink {
+
+    @ApiModelProperty("Kudu masters, a comma separated list of 'host:port' 
pairs")
+    private String masters;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Partition field list")
+    private String partitionKey;
+
+    public KuduSink() {
+        this.setSinkType(SinkType.KUDU);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, KuduSinkRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
new file mode 100644
index 000000000..e7351d81c
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Kudu sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KuduSinkDTO {
+
+    @ApiModelProperty("Kudu masters, a comma separated list of 'host:port' 
pairs")
+    private String masters;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Properties for Kudu")
+    private Map<String, Object> properties;
+
+    @ApiModelProperty("Partition field list")
+    private String partitionKey;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static KuduSinkDTO getFromRequest(KuduSinkRequest request) {
+        return KuduSinkDTO.builder()
+                .tableName(request.getTableName())
+                .masters(request.getMasters())
+                .properties(request.getProperties())
+                .build();
+    }
+
+    public static KuduSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, KuduSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Kudu SinkDTO failure: 
%s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Get Kudu table info
+     */
+    public static KuduTableInfo getKuduTableInfo(KuduSinkDTO kuduInfo, 
List<KuduColumnInfo> columnList) {
+        KuduTableInfo tableInfo = new KuduTableInfo();
+        tableInfo.setTableName(kuduInfo.getTableName());
+        tableInfo.setMasters(kuduInfo.getMasters());
+        tableInfo.setColumns(columnList);
+        tableInfo.setTblProperties(kuduInfo.getProperties());
+        return tableInfo;
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
new file mode 100644
index 000000000..ab6592f96
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Kudu sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Kudu sink request")
+@JsonTypeDefine(value = SinkType.KUDU)
+public class KuduSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Kudu masters, a comma separated list of 'host:port' 
pairs")
+    private String masters;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
new file mode 100644
index 000000000..46372b996
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Kudu table info
+ */
+@Data
+public class KuduTableInfo {
+
+    private String masters;
+    private String tableName;
+    private String tableDesc;
+    private Map<String, Object> tblProperties;
+    private List<KuduColumnInfo> columns;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduType.java
new file mode 100644
index 000000000..81030f5aa
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import lombok.Getter;
+
+/**
+ * Kudu data type
+ */
+public enum KuduType {
+
+    BOOLEAN("boolean", "bool"),
+    INT("int", "int32"),
+    LONG("long", "int64"),
+    FLOAT("float", "float"),
+    DOUBLE("double", "double"),
+    DATE("date", "date"),
+    TIMESTAMP("timestamp", "unixtime_micros"),
+    STRING("string", "string"),
+    BINARY("binary", "binary"),
+    FIXED("fixed", null),
+    DECIMAL("decimal", "decimal"),
+    ;
+
+    @Getter
+    private final String type;
+
+    @Getter
+    private final String kuduType;
+
+    KuduType(String type, String kuduType) {
+        this.type = type;
+        this.kuduType = kuduType;
+    }
+
+    /**
+     * Get type from name
+     */
+    public static KuduType forType(String type) {
+        for (KuduType ibType : values()) {
+            if (ibType.getType().equalsIgnoreCase(type)) {
+                return ibType;
+            }
+        }
+        throw new IllegalArgumentException(String.format("invalid type = %s", 
type));
+    }
+
+    /**
+     * Get type by Kudu type name
+     */
+    public static final KuduType forKuduType(String kuduType) {
+        for (KuduType type : values()) {
+            if (type.getKuduType() == kuduType) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException(String.format("invalid kudu type = 
%s", kuduType));
+    }
+
+    public String kuduType() {
+        return kuduType;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 05a63abc4..fa32b79ab 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -38,6 +38,7 @@ import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
@@ -70,6 +71,7 @@ import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
@@ -149,6 +151,8 @@ public class LoadNodeUtils {
                 return createLoadNode((DorisSink) streamSink, fieldInfos, 
fieldRelations, properties);
             case SinkType.STARROCKS:
                 return createLoadNode((StarRocksSink) streamSink, fieldInfos, 
fieldRelations, properties);
+            case SinkType.KUDU:
+                return createLoadNode((KuduSink) streamSink, fieldInfos, 
fieldRelations, properties);
             case SinkType.REDIS:
                 return createLoadNode((RedisSink) streamSink, fieldInfos, 
fieldRelations, properties);
             default:
@@ -386,6 +390,28 @@ public class LoadNodeUtils {
                 starRocksSink.getTablePattern());
     }
 
+    /**
+     * Create load node of Kudu.
+     */
+    public static KuduLoadNode createLoadNode(
+            KuduSink kuduSink,
+            List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations,
+            Map<String, String> properties) {
+        return new KuduLoadNode(
+                kuduSink.getSinkName(),
+                kuduSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                kuduSink.getMasters(),
+                kuduSink.getTableName(),
+                kuduSink.getPartitionKey());
+    }
+
     private static LoadNode createLoadNode(
             RedisSink redisSink,
             List<FieldInfo> fieldInfos,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
new file mode 100644
index 000000000..f9122929a
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.kudu;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduColumnInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSinkDTO;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Kudu sink operator, such as save or update Kudu field, etc.
+ */
+@Service
+public class KuduSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KuduSinkOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.KUDU.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.KUDU;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+        KuduSinkRequest sinkRequest = (KuduSinkRequest) request;
+        String masters = sinkRequest.getMasters();
+        if (StringUtils.isBlank(masters)) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, 
"masters can not be empty!");
+        }
+        if (masters.contains(InlongConstants.SEMICOLON)) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, 
"masters can not contain comma!");
+        }
+
+        try {
+            KuduSinkDTO dto = KuduSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Kudu SinkDTO 
failure: %s", e.getMessage()));
+        }
+
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        KuduSink sink = new KuduSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        KuduSinkDTO dto = KuduSinkDTO.getFromJson(entity.getExtParams());
+
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.debug("begin to save kudu sink fields={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            this.checkFieldInfo(fieldInfo);
+            StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            try {
+                KuduColumnInfo dto = KuduColumnInfo.getFromRequest(fieldInfo);
+                fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            } catch (Exception e) {
+                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                        String.format("serialize extParams of ClickHouse 
FieldInfo failure: %s", e.getMessage()));
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.debug("success to save es sink fields");
+    }
+
+    @Override
+    public List<SinkField> getSinkFields(Integer sinkId) {
+        List<StreamSinkFieldEntity> sinkFieldEntities = 
sinkFieldMapper.selectBySinkId(sinkId);
+        List<SinkField> fieldList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+            return fieldList;
+        }
+        sinkFieldEntities.forEach(field -> {
+            SinkField sinkField = new SinkField();
+            if (StringUtils.isNotBlank(field.getExtParams())) {
+                KuduColumnInfo kuduColumnInfo = KuduColumnInfo.getFromJson(
+                        field.getExtParams());
+                CommonBeanUtils.copyProperties(field, kuduColumnInfo, true);
+                fieldList.add(kuduColumnInfo);
+            } else {
+                CommonBeanUtils.copyProperties(field, sinkField, true);
+                fieldList.add(sinkField);
+            }
+
+        });
+        return fieldList;
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
index 3adb15797..20208ac08 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
@@ -35,7 +35,6 @@ import 
org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -66,9 +65,6 @@ public class KuduLoadNode extends LoadNode implements 
InlongMetric, Serializable
     @Nonnull
     private String tableName;
 
-    @JsonProperty("extList")
-    private List<HashMap<String, String>> extList;
-
     @JsonProperty("partitionKey")
     private String partitionKey;
 
@@ -84,12 +80,10 @@ public class KuduLoadNode extends LoadNode implements 
InlongMetric, Serializable
             @JsonProperty("properties") Map<String, String> properties,
             @Nonnull @JsonProperty("masters") String masters,
             @Nonnull @JsonProperty("tableName") String tableName,
-            @JsonProperty("partitionKey") String partitionKey,
-            @JsonProperty("extList") List<HashMap<String, String>> extList) {
+            @JsonProperty("partitionKey") String partitionKey) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "table name is 
null");
         this.masters = Preconditions.checkNotNull(masters, "masters is null");
-        this.extList = extList;
         this.partitionKey = partitionKey;
     }
 
@@ -102,15 +96,16 @@ public class KuduLoadNode extends LoadNode implements 
InlongMetric, Serializable
 
         // If the extend attributes starts with .ddl,
         // it will be passed to the ddl statement of the table
-        extList.forEach(ext -> {
-            String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
-            if (StringUtils.isNoneBlank(keyName) &&
-                    keyName.startsWith(DDL_ATTR_PREFIX)) {
-                String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
-                String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
-                options.put(ddlKeyName, ddlValue);
-            }
-        });
+        Map<String, String> properties = getProperties();
+        if (properties != null) {
+            properties.forEach((keyName, ddlValue) -> {
+                if (StringUtils.isNotBlank(keyName) &&
+                        keyName.startsWith(DDL_ATTR_PREFIX)) {
+                    String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
+                    options.put(ddlKeyName, ddlValue);
+                }
+            });
+        }
 
         options.put("connector", "kudu-inlong");
 

Reply via email to