This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a97e9e4e4 [INLONG-7265][Manager] Support register and manage the
resource of Kudu sink (#7270)
a97e9e4e4 is described below
commit a97e9e4e4336910fab8857d17eeda8b7f53cfe39
Author: feat <[email protected]>
AuthorDate: Tue Mar 14 13:13:29 2023 +0800
[INLONG-7265][Manager] Support register and manage the resource of Kudu
sink (#7270)
Co-authored-by: healchow <[email protected]>
---
.../inlong/manager/common/consts/SinkType.java | 1 +
.../manager/pojo/sink/kudu/KuduColumnInfo.java | 75 ++++++++++
.../inlong/manager/pojo/sink/kudu/KuduSink.java | 63 ++++++++
.../inlong/manager/pojo/sink/kudu/KuduSinkDTO.java | 86 +++++++++++
.../manager/pojo/sink/kudu/KuduSinkRequest.java | 45 ++++++
.../manager/pojo/sink/kudu/KuduTableInfo.java | 37 +++++
.../inlong/manager/pojo/sink/kudu/KuduType.java | 78 ++++++++++
.../manager/pojo/sort/util/LoadNodeUtils.java | 26 ++++
.../service/sink/kudu/KuduSinkOperator.java | 164 +++++++++++++++++++++
.../sort/protocol/node/load/KuduLoadNode.java | 27 ++--
10 files changed, 586 insertions(+), 16 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index c7e293ed6..a74c9c337 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -38,5 +38,6 @@ public class SinkType {
public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
public static final String DORIS = "DORIS";
public static final String STARROCKS = "STARROCKS";
+ public static final String KUDU = "KUDU";
public static final String REDIS = "REDIS";
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduColumnInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduColumnInfo.java
new file mode 100644
index 000000000..9afeea165
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduColumnInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Kudu column info
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.KUDU)
+public class KuduColumnInfo extends SinkField {
+
+ private Integer length;
+
+ private Integer precision;
+
+ private Integer scale;
+
+ private String partitionStrategy;
+
+ private Integer bucketNum;
+
+ private Integer width;
+
+ /**
+ * Get the extra param from the Json
+ */
+ public static KuduColumnInfo getFromJson(@NotNull String extParams) {
+ if (StringUtils.isEmpty(extParams)) {
+ return new KuduColumnInfo();
+ }
+ try {
+ return JsonUtils.parseObject(extParams, KuduColumnInfo.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static KuduColumnInfo getFromRequest(SinkField sinkField) {
+ return CommonBeanUtils.copyProperties(sinkField, KuduColumnInfo::new,
true);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
new file mode 100644
index 000000000..871ce2f98
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Kudu sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Kudu sink info")
+@JsonTypeDefine(value = SinkType.KUDU)
+public class KuduSink extends StreamSink {
+
+ @ApiModelProperty("Kudu masters, a comma separated list of 'host:port'
pairs")
+ private String masters;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Partition field list")
+ private String partitionKey;
+
+ public KuduSink() {
+ this.setSinkType(SinkType.KUDU);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, KuduSinkRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
new file mode 100644
index 000000000..e7351d81c
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkDTO.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Kudu sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KuduSinkDTO {
+
+ @ApiModelProperty("Kudu masters, a comma separated list of 'host:port'
pairs")
+ private String masters;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Properties for Kudu")
+ private Map<String, Object> properties;
+
+ @ApiModelProperty("Partition field list")
+ private String partitionKey;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static KuduSinkDTO getFromRequest(KuduSinkRequest request) {
+ return KuduSinkDTO.builder()
+ .tableName(request.getTableName())
+ .masters(request.getMasters())
+ .properties(request.getProperties())
+ .build();
+ }
+
+ public static KuduSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, KuduSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Kudu SinkDTO failure:
%s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Get Kudu table info
+ */
+ public static KuduTableInfo getKuduTableInfo(KuduSinkDTO kuduInfo,
List<KuduColumnInfo> columnList) {
+ KuduTableInfo tableInfo = new KuduTableInfo();
+ tableInfo.setTableName(kuduInfo.getTableName());
+ tableInfo.setMasters(kuduInfo.getMasters());
+ tableInfo.setColumns(columnList);
+ tableInfo.setTblProperties(kuduInfo.getProperties());
+ return tableInfo;
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
new file mode 100644
index 000000000..ab6592f96
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduSinkRequest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Kudu sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Kudu sink request")
+@JsonTypeDefine(value = SinkType.KUDU)
+public class KuduSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("Kudu masters, a comma separated list of 'host:port'
pairs")
+ private String masters;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
new file mode 100644
index 000000000..46372b996
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Kudu table info
+ */
+@Data
+public class KuduTableInfo {
+
+ private String masters;
+ private String tableName;
+ private String tableDesc;
+ private Map<String, Object> tblProperties;
+ private List<KuduColumnInfo> columns;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduType.java
new file mode 100644
index 000000000..81030f5aa
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kudu/KuduType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.kudu;
+
+import lombok.Getter;
+
+/**
+ * Kudu data type
+ */
+public enum KuduType {
+
+ BOOLEAN("boolean", "bool"),
+ INT("int", "int32"),
+ LONG("long", "int64"),
+ FLOAT("float", "float"),
+ DOUBLE("double", "double"),
+ DATE("date", "date"),
+ TIMESTAMP("timestamp", "unixtime_micros"),
+ STRING("string", "string"),
+ BINARY("binary", "binary"),
+ FIXED("fixed", null),
+ DECIMAL("decimal", "decimal"),
+ ;
+
+ @Getter
+ private final String type;
+
+ @Getter
+ private final String kuduType;
+
+ KuduType(String type, String kuduType) {
+ this.type = type;
+ this.kuduType = kuduType;
+ }
+
+ /**
+ * Get type from name
+ */
+ public static KuduType forType(String type) {
+ for (KuduType ibType : values()) {
+ if (ibType.getType().equalsIgnoreCase(type)) {
+ return ibType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("invalid type = %s",
type));
+ }
+
+ /**
+ * Get type by Kudu type name
+ */
+ public static final KuduType forKuduType(String kuduType) {
+ for (KuduType type : values()) {
+ if (type.getKuduType() == kuduType) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException(String.format("invalid kudu type =
%s", kuduType));
+ }
+
+ public String kuduType() {
+ return kuduType;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 05a63abc4..fa32b79ab 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -38,6 +38,7 @@ import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
@@ -70,6 +71,7 @@ import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
@@ -149,6 +151,8 @@ public class LoadNodeUtils {
return createLoadNode((DorisSink) streamSink, fieldInfos,
fieldRelations, properties);
case SinkType.STARROCKS:
return createLoadNode((StarRocksSink) streamSink, fieldInfos,
fieldRelations, properties);
+ case SinkType.KUDU:
+ return createLoadNode((KuduSink) streamSink, fieldInfos,
fieldRelations, properties);
case SinkType.REDIS:
return createLoadNode((RedisSink) streamSink, fieldInfos,
fieldRelations, properties);
default:
@@ -386,6 +390,28 @@ public class LoadNodeUtils {
starRocksSink.getTablePattern());
}
+ /**
+ * Create load node of Kudu.
+ */
+ public static KuduLoadNode createLoadNode(
+ KuduSink kuduSink,
+ List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations,
+ Map<String, String> properties) {
+ return new KuduLoadNode(
+ kuduSink.getSinkName(),
+ kuduSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ kuduSink.getMasters(),
+ kuduSink.getTableName(),
+ kuduSink.getPartitionKey());
+ }
+
private static LoadNode createLoadNode(
RedisSink redisSink,
List<FieldInfo> fieldInfos,
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
new file mode 100644
index 000000000..f9122929a
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.kudu;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduColumnInfo;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSinkDTO;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Kudu sink operator, such as save or update Kudu field, etc.
+ */
+@Service
+public class KuduSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KuduSinkOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.KUDU.equals(sinkType);
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.KUDU;
+ }
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+ KuduSinkRequest sinkRequest = (KuduSinkRequest) request;
+ String masters = sinkRequest.getMasters();
+ if (StringUtils.isBlank(masters)) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
"masters can not be empty!");
+ }
+ if (masters.contains(InlongConstants.SEMICOLON)) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
"masters can not contain comma!");
+ }
+
+ try {
+ KuduSinkDTO dto = KuduSinkDTO.getFromRequest(sinkRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Kudu SinkDTO
failure: %s", e.getMessage()));
+ }
+
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ KuduSink sink = new KuduSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ KuduSinkDTO dto = KuduSinkDTO.getFromJson(entity.getExtParams());
+
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
+ List<SinkField> fieldList = request.getSinkFieldList();
+ LOGGER.debug("begin to save kudu sink fields={}", fieldList);
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return;
+ }
+ int size = fieldList.size();
+ List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkType = request.getSinkType();
+ Integer sinkId = request.getId();
+ for (SinkField fieldInfo : fieldList) {
+ this.checkFieldInfo(fieldInfo);
+ StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ try {
+ KuduColumnInfo dto = KuduColumnInfo.getFromRequest(fieldInfo);
+ fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of ClickHouse
FieldInfo failure: %s", e.getMessage()));
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSinkType(sinkType);
+ fieldEntity.setSinkId(sinkId);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+ entityList.add(fieldEntity);
+ }
+
+ sinkFieldMapper.insertAll(entityList);
+ LOGGER.debug("success to save es sink fields");
+ }
+
+ @Override
+ public List<SinkField> getSinkFields(Integer sinkId) {
+ List<StreamSinkFieldEntity> sinkFieldEntities =
sinkFieldMapper.selectBySinkId(sinkId);
+ List<SinkField> fieldList = new ArrayList<>();
+ if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+ return fieldList;
+ }
+ sinkFieldEntities.forEach(field -> {
+ SinkField sinkField = new SinkField();
+ if (StringUtils.isNotBlank(field.getExtParams())) {
+ KuduColumnInfo kuduColumnInfo = KuduColumnInfo.getFromJson(
+ field.getExtParams());
+ CommonBeanUtils.copyProperties(field, kuduColumnInfo, true);
+ fieldList.add(kuduColumnInfo);
+ } else {
+ CommonBeanUtils.copyProperties(field, sinkField, true);
+ fieldList.add(sinkField);
+ }
+
+ });
+ return fieldList;
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
index 3adb15797..20208ac08 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KuduLoadNode.java
@@ -35,7 +35,6 @@ import
org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,9 +65,6 @@ public class KuduLoadNode extends LoadNode implements
InlongMetric, Serializable
@Nonnull
private String tableName;
- @JsonProperty("extList")
- private List<HashMap<String, String>> extList;
-
@JsonProperty("partitionKey")
private String partitionKey;
@@ -84,12 +80,10 @@ public class KuduLoadNode extends LoadNode implements
InlongMetric, Serializable
@JsonProperty("properties") Map<String, String> properties,
@Nonnull @JsonProperty("masters") String masters,
@Nonnull @JsonProperty("tableName") String tableName,
- @JsonProperty("partitionKey") String partitionKey,
- @JsonProperty("extList") List<HashMap<String, String>> extList) {
+ @JsonProperty("partitionKey") String partitionKey) {
super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is
null");
this.masters = Preconditions.checkNotNull(masters, "masters is null");
- this.extList = extList;
this.partitionKey = partitionKey;
}
@@ -102,15 +96,16 @@ public class KuduLoadNode extends LoadNode implements
InlongMetric, Serializable
// If the extend attributes starts with .ddl,
// it will be passed to the ddl statement of the table
- extList.forEach(ext -> {
- String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
- if (StringUtils.isNoneBlank(keyName) &&
- keyName.startsWith(DDL_ATTR_PREFIX)) {
- String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
- String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
- options.put(ddlKeyName, ddlValue);
- }
- });
+ Map<String, String> properties = getProperties();
+ if (properties != null) {
+ properties.forEach((keyName, ddlValue) -> {
+ if (StringUtils.isNotBlank(keyName) &&
+ keyName.startsWith(DDL_ATTR_PREFIX)) {
+ String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
+ options.put(ddlKeyName, ddlValue);
+ }
+ });
+ }
options.put("connector", "kudu-inlong");