This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ffb627a01c [INLONG-9362][Manager] Iceberg support config all migrate
(#9363)
ffb627a01c is described below
commit ffb627a01ca5da9ea4e58a1d75d7a6b84bf1d2fd
Author: fuweng11 <[email protected]>
AuthorDate: Thu Nov 30 15:30:07 2023 +0800
[INLONG-9362][Manager] Iceberg support config all migrate (#9363)
---
.../manager/pojo/sink/iceberg/IcebergSink.java | 15 ++++++++
.../manager/pojo/sink/iceberg/IcebergSinkDTO.java | 18 ++++++++++
.../pojo/sink/iceberg/IcebergSinkRequest.java | 15 ++++++++
.../inlong/manager/pojo/sort/node/NodeFactory.java | 4 +++
.../pojo/sort/node/base/ExtractNodeProvider.java | 4 +++
.../pojo/sort/node/base/LoadNodeProvider.java | 13 +++++++
.../pojo/sort/node/provider/IcebergProvider.java | 41 ++++++++++++++++++++--
.../manager/pojo/stream/InlongStreamBriefInfo.java | 3 ++
.../manager/pojo/stream/InlongStreamExtParam.java | 3 ++
.../manager/pojo/stream/InlongStreamInfo.java | 3 ++
.../manager/pojo/stream/InlongStreamRequest.java | 3 ++
.../main/resources/h2/apache_inlong_manager.sql | 3 ++
.../manager-web/sql/apache_inlong_manager.sql | 3 ++
inlong-manager/manager-web/sql/changes-1.10.0.sql | 4 +--
14 files changed, 128 insertions(+), 4 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
index 19b988b79d..5fc2afc586 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
@@ -75,6 +75,21 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("append mode, UPSERT or APPEND")
private String appendMode;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
+ @ApiModelProperty("The multiple format of sink")
+ private String sinkMultipleFormat;
+
+ @ApiModelProperty("database pattern")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern")
+ private String tablePattern;
+
+ @ApiModelProperty("enable schema change")
+ private Boolean enableSchemaChange = false;
+
public IcebergSink() {
this.setSinkType(SinkType.ICEBERG);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 2c8dc3f7bf..379aaa0a75 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -68,6 +68,24 @@ public class IcebergSinkDTO {
@ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month,
O-once, R-regulation")
private String partitionType;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
+ @ApiModelProperty("The multiple format of sink")
+ private String sinkMultipleFormat;
+
+ @ApiModelProperty("database pattern")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern")
+ private String tablePattern;
+
+ @ApiModelProperty("append mode, UPSERT or APPEND")
+ private String appendMode;
+
+ @ApiModelProperty("enable schema change")
+ private Boolean enableSchemaChange = false;
+
@ApiModelProperty("Primary key")
private String primaryKey;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
index aa3c606b3f..68bb94cc68 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -70,4 +70,19 @@ public class IcebergSinkRequest extends SinkRequest {
@Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode")
private String appendMode;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
+ @ApiModelProperty("The multiple format of sink")
+ private String sinkMultipleFormat;
+
+ @ApiModelProperty("database pattern")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern")
+ private String tablePattern;
+
+ @ApiModelProperty("enable schema change")
+ private Boolean enableSchemaChange = false;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
index fdf68e0ce9..88a7992f78 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -103,6 +103,10 @@ public class NodeFactory {
sourceInfo.getSourceType());
LoadNodeProvider loadNodeProvider =
LoadNodeProviderFactory.getLoadNodeProvider(sinkInfo.getSinkType());
+ if (loadNodeProvider.isSinkMultiple(sinkInfo)) {
+
sourceInfo.setFieldList(loadNodeProvider.addStreamFieldsForSinkMultiple(sourceInfo.getFieldList()));
+
sinkInfo.setSinkFieldList(loadNodeProvider.addSinkFieldsForSinkMultiple(sinkInfo.getSinkFieldList()));
+ }
if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(),
loadNodeProvider.getMetaFields())) {
extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList());
if (CollectionUtils.isNotEmpty(transformResponses)) {
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index 42b36b7591..d7e3c89648 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -34,6 +34,7 @@ import
org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
@@ -75,6 +76,9 @@ public interface ExtractNodeProvider extends NodeProvider {
*/
default List<FieldInfo> parseStreamFieldInfos(List<StreamField>
streamFields, String nodeId,
FieldTypeMappingStrategy fieldTypeMappingStrategy) {
+ if (CollectionUtils.isEmpty(streamFields)) {
+ return null;
+ }
// Filter constant fields
return streamFields.stream().filter(s ->
Objects.isNull(s.getFieldValue()))
.map(streamFieldInfo -> FieldInfoUtils
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 4af5e532d4..87f82e7185 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -145,4 +146,16 @@ public interface LoadNodeProvider extends NodeProvider {
default List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
return sinkFields;
}
+
+ default Boolean isSinkMultiple(StreamNode nodeInfo) {
+ return false;
+ }
+
+ default List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField>
streamFields) {
+ return new ArrayList<>();
+ }
+
+ default List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField>
sinkFields) {
+ return new ArrayList<>();
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 2147c3159a..37302bc396 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -33,10 +33,12 @@ import
org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@@ -82,7 +84,8 @@ public class IcebergProvider implements ExtractNodeProvider,
LoadNodeProvider {
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName());
List<FieldRelation> fieldRelations =
parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap);
IcebergConstant.CatalogType catalogType =
CatalogType.forName(icebergSink.getCatalogType());
-
+ Format format =
parsingSinkMultipleFormat(icebergSink.getSinkMultipleEnable(),
+ icebergSink.getSinkMultipleFormat());
return new IcebergLoadNode(
icebergSink.getSinkName(),
icebergSink.getSinkName(),
@@ -98,7 +101,13 @@ public class IcebergProvider implements
ExtractNodeProvider, LoadNodeProvider {
catalogType,
icebergSink.getCatalogUri(),
icebergSink.getWarehouse(),
- icebergSink.getAppendMode());
+ icebergSink.getAppendMode(),
+ icebergSink.getSinkMultipleEnable(),
+ format,
+ icebergSink.getDatabasePattern(),
+ icebergSink.getTablePattern(),
+ icebergSink.getEnableSchemaChange(),
+ null);
}
@Override
@@ -128,4 +137,32 @@ public class IcebergProvider implements
ExtractNodeProvider, LoadNodeProvider {
fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(),
MetaField.AUDIT_DATA_TIME));
return fieldInfos;
}
+
+ @Override
+ public Boolean isSinkMultiple(StreamNode nodeInfo) {
+ IcebergSink icebergSink = (IcebergSink) nodeInfo;
+ return icebergSink.getSinkMultipleEnable();
+ }
+
+ @Override
+ public List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField>
streamFields) {
+ if (CollectionUtils.isEmpty(streamFields)) {
+ streamFields = new ArrayList<>();
+ }
+ streamFields.add(0,
+ new StreamField(0, "varbinary",
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1,
+ MetaField.DATA_BYTES_CANAL.name()));
+ return streamFields;
+ }
+
+ @Override
+ public List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField>
sinkFields) {
+ if (CollectionUtils.isEmpty(sinkFields)) {
+ sinkFields = new ArrayList<>();
+ }
+ sinkFields.add(0, new SinkField(0, "varbinary",
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal",
+ MetaField.DATA_BYTES_CANAL.name(), "varbinary", 0,
MetaField.DATA_BYTES_CANAL.name(), null));
+ return sinkFields;
+ }
+
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index b8d04fde85..2a3d91e3a4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -88,6 +88,9 @@ public class InlongStreamBriefInfo {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
@ApiModelProperty(value = "Status")
private Integer status;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 2690576aad..9b357117c8 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Predefined fields")
private String predefinedFields;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
@ApiModelProperty(value = "Extended field size")
private Integer extendedFieldSize = 0;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index a8cb701748..57f9bbeba2 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -139,6 +139,9 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "Extended field size")
private Integer extendedFieldSize = 0;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable;
+
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError = true;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index bb6a0d2964..fc2b012925 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -127,6 +127,9 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Extended field size")
private Integer extendedFieldSize = 0;
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable;
+
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
private String wrapType;
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index d27d70d5a3..168c2fc7ca 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -302,6 +302,9 @@ CREATE TABLE IF NOT EXISTS `operation_log`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`authentication_type` varchar(64) DEFAULT NULL COMMENT
'Authentication type',
+ `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
group id',
+ `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
stream id',
+ `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation
target',
`operation_type` varchar(256) DEFAULT NULL COMMENT 'Operation
type',
`http_method` varchar(64) DEFAULT NULL COMMENT 'Request
method',
`invoke_method` varchar(256) DEFAULT NULL COMMENT 'Invoke
method',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f1b5379e0a..f64c45b63d 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -319,6 +319,9 @@ CREATE TABLE IF NOT EXISTS `operation_log`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`authentication_type` varchar(64) DEFAULT NULL COMMENT
'Authentication type',
+ `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
group id',
+ `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
stream id',
+ `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation
target',
`operation_type` varchar(256) DEFAULT NULL COMMENT 'Operation
type',
`http_method` varchar(64) DEFAULT NULL COMMENT 'Request
method',
`invoke_method` varchar(256) DEFAULT NULL COMMENT 'Invoke
method',
diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql
b/inlong-manager/manager-web/sql/changes-1.10.0.sql
index 3a1abeb04b..a09a83faf7 100644
--- a/inlong-manager/manager-web/sql/changes-1.10.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql
@@ -40,10 +40,10 @@ ALTER TABLE `operation_log`
ADD COLUMN `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
group id';
ALTER TABLE `operation_log`
- ADD COLUMN `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
stream id',
+ ADD COLUMN `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong
stream id';
ALTER TABLE `operation_log`
- ADD COLUMN `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation
target',
+ ADD COLUMN `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation
target';
CREATE INDEX operation_log_group_stream_index ON operation_log
(`inlong_group_id`, `inlong_stream_id`);