This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ba64f0ad8 [INLONG-4034][Manager] Add originFieldName in StreamField
(#4037)
ba64f0ad8 is described below
commit ba64f0ad84cbce7ad099022facfb2477a2f8448d
Author: kipshi <[email protected]>
AuthorDate: Fri Apr 29 20:08:06 2022 +0800
[INLONG-4034][Manager] Add originFieldName in StreamField (#4037)
---
.../manager/common/pojo/stream/StreamField.java | 11 +++++
.../dao/entity/StreamTransformFieldEntity.java | 2 +
.../mappers/StreamTransformFieldEntityMapper.xml | 52 ++++++++++++++--------
.../service/sort/util/ExtractNodeUtils.java | 6 ++-
.../service/sort/util/FieldRelationShipUtils.java | 19 +++++++-
.../main/resources/sql/apache_inlong_manager.sql | 31 ++++++-------
.../manager-web/sql/apache_inlong_manager.sql | 31 ++++++-------
7 files changed, 101 insertions(+), 51 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
index e2e4b1efc..381576015 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
@@ -56,6 +56,14 @@ public class StreamField {
this.originNodeName = originNodeName;
}
+ public StreamField(int index, FieldType fieldType, String fieldName,
String fieldComment, String fieldValue,
+ Integer isMetaField, String originNodeName, String
originFieldName) {
+ this(index, fieldType, fieldName, fieldComment, fieldValue);
+ this.isMetaField = isMetaField;
+ this.originNodeName = originNodeName;
+ this.originFieldName = originFieldName;
+ }
+
@ApiModelProperty("Field index")
private Integer id;
@@ -81,4 +89,7 @@ public class StreamField {
@ApiModelProperty("Origin Node name which stream field belongs")
private String originNodeName;
+ @ApiModelProperty("Origin field name before transform operation")
+ private String originFieldName;
+
}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index d88eea123..a7d72b3e1 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -53,4 +53,6 @@ public class StreamTransformFieldEntity implements
Serializable {
private Integer isDeleted;
private String originNodeName;
+
+ private String originFieldName;
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index c0e8fff14..2ea774647 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -36,12 +36,13 @@
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
<result column="origin_node_name" jdbcType="VARCHAR"
property="originNodeName"/>
+ <result column="origin_field_name" jdbcType="VARCHAR"
property="originFieldName"/>
</resultMap>
<sql id="Base_Column_List">
id
, inlong_group_id, inlong_stream_id, transform_id, transform_type,
field_name,
field_value, pre_expression, field_type, field_comment, is_meta_field,
field_format,
- rank_num, is_deleted, origin_node_name
+ rank_num, is_deleted, origin_node_name, origin_field_name
</sql>
<select id="selectByTransformId"
resultType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
select
@@ -77,12 +78,14 @@
transform_id, transform_type,
field_name,
field_value, pre_expression,
field_type,
field_comment, is_meta_field,
field_format,
- rank_num, is_deleted,
origin_node_name)
+ rank_num, is_deleted,
origin_node_name,
+ origin_field_name)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR},
#{transformId,jdbcType=INTEGER},
#{transformType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
#{fieldValue,jdbcType=VARCHAR},
#{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
#{fieldComment,jdbcType=VARCHAR},
#{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER},
#{originNodeName,jdbcType=VARCHAR})
+ #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER},
#{originNodeName,jdbcType=VARCHAR},
+ #{originFieldName,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
@@ -133,6 +136,9 @@
<if test="originNodeName != null">
origin_node_name,
</if>
+ <if test="originFieldName != null">
+ origin_field_name,
+ </if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -180,6 +186,9 @@
<if test="originNodeName != null">
#{originNodeName,jdbcType=VARCHAR},
</if>
+ <if test="originFieldName != null">
+ #{originFieldName,jdbcType=VARCHAR},
+ </if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective"
@@ -228,25 +237,29 @@
<if test="originNodeName != null">
origin_node_name = #{originNodeName,jdbcType=VARCHAR},
</if>
+ <if test="originFieldName != null">
+ origin_field_name = #{originFieldName,jdbcType=VARCHAR},
+ </if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
<update id="updateByPrimaryKey"
parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
update stream_transform_field
- set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- transform_id = #{transformId,jdbcType=INTEGER},
- transform_type = #{transformType,jdbcType=VARCHAR},
- field_name = #{fieldName,jdbcType=VARCHAR},
- field_value = #{fieldValue,jdbcType=VARCHAR},
- pre_expression = #{preExpression,jdbcType=VARCHAR},
- field_type = #{fieldType,jdbcType=VARCHAR},
- field_comment = #{fieldComment,jdbcType=VARCHAR},
- is_meta_field = #{isMetaField,jdbcType=SMALLINT},
- field_format = #{fieldFormat,jdbcType=VARCHAR},
- rank_num = #{rankNum,jdbcType=SMALLINT},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- origin_node_name = #{originNodeName,jdbcType=VARCHAR}
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ transform_id = #{transformId,jdbcType=INTEGER},
+ transform_type = #{transformType,jdbcType=VARCHAR},
+ field_name = #{fieldName,jdbcType=VARCHAR},
+ field_value = #{fieldValue,jdbcType=VARCHAR},
+ pre_expression = #{preExpression,jdbcType=VARCHAR},
+ field_type = #{fieldType,jdbcType=VARCHAR},
+ field_comment = #{fieldComment,jdbcType=VARCHAR},
+ is_meta_field = #{isMetaField,jdbcType=SMALLINT},
+ field_format = #{fieldFormat,jdbcType=VARCHAR},
+ rank_num = #{rankNum,jdbcType=SMALLINT},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ origin_node_name = #{originNodeName,jdbcType=VARCHAR},
+ origin_field_name = #{originFieldName,jdbcType=VARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
@@ -255,7 +268,8 @@
transform_id, transform_type, field_name,
field_value, pre_expression, field_type,
field_comment, is_meta_field, field_format,
- rank_num, is_deleted, origin_node_name)
+ rank_num, is_deleted, origin_node_name,
+ origin_field_name)
values
<foreach collection="list" index="index" item="item" separator=",">
(#{item.id,jdbcType=INTEGER},
#{item.inlongGroupId,jdbcType=VARCHAR},
@@ -267,7 +281,7 @@
#{item.fieldComment,jdbcType=VARCHAR},
#{item.isMetaField,jdbcType=SMALLINT},
#{item.fieldFormat,jdbcType=VARCHAR},
#{item.rankNum,jdbcType=SMALLINT},
#{item.isDeleted,jdbcType=INTEGER},
- #{item.originNodeName,jdbcType=VARCHAR})
+ #{item.originNodeName,jdbcType=VARCHAR},
#{item.originFieldName,jdbcType=VARCHAR})
</foreach>
</insert>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 5f4681c3d..fba6fb566 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -97,6 +97,10 @@ public class ExtractNodeUtils {
.map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
String serverTimeZone = binlogSourceResponse.getServerTimezone();
+ boolean incrementalSnapshotEnabled = true;
+ if (binlogSourceResponse.isAllMigration()) {
+ incrementalSnapshotEnabled = false;
+ }
return new MySqlExtractNode(id,
name,
fieldInfos,
@@ -110,7 +114,7 @@ public class ExtractNodeUtils {
database,
port,
serverId,
- true,
+ incrementalSnapshotEnabled,
serverTimeZone);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
index 2e344dcca..20d7310bb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
@@ -31,6 +31,7 @@ import
org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefiniti
import
org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefinition.SplitRule;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.util.StreamParseUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -61,8 +62,9 @@ public class FieldRelationShipUtils {
return createReplacerFieldRelationShips(fieldList,
transformName, replacerDefinition, preNodes);
case DE_DUPLICATION:
case FILTER:
- case JOINER:
return createFieldRelationShips(fieldList, transformName);
+ case JOINER:
+ return createJoinerFieldRelationShips(fieldList,
transformName);
default:
throw new UnsupportedOperationException(
String.format("Unsupported transformType=%s for
Inlong", transformType));
@@ -81,6 +83,21 @@ public class FieldRelationShipUtils {
}).collect(Collectors.toList());
}
+ private static List<FieldRelationShip>
createJoinerFieldRelationShips(List<StreamField> fieldList,
+ String transformName) {
+ return fieldList.stream()
+ .map(streamFieldInfo -> {
+ FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+ streamFieldInfo.getFieldType().name(),
+ streamFieldInfo.getFieldFormat());
+ FieldInfo inputField = new
FieldInfo(streamFieldInfo.getOriginFieldName(),
+ streamFieldInfo.getOriginNodeName(), formatInfo);
+ FieldInfo outputField = new
FieldInfo(streamFieldInfo.getFieldName(),
+ transformName, formatInfo);
+ return new FieldRelationShip(inputField, outputField);
+ }).collect(Collectors.toList());
+ }
+
private static List<FieldRelationShip>
createSplitterFieldRelationShips(List<StreamField> fieldList,
String transformName, SplitterDefinition splitterDefinition,
String preNodes) {
Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not
be null");
diff --git
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index c86a00abc..d182ba63e 100644
---
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -621,21 +621,22 @@ CREATE TABLE `stream_source_field`
DROP TABLE IF EXISTS `stream_transform_field`;
CREATE TABLE `stream_transform_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `transform_id` int(11) NOT NULL COMMENT 'Transform id',
- `transform_type` varchar(15) NOT NULL COMMENT 'Transform type',
- `field_name` varchar(50) NOT NULL COMMENT 'Field name',
- `field_value` varchar(128) DEFAULT NULL COMMENT 'Field value,
required if it is a predefined field',
- `pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field
value expression',
- `field_type` varchar(50) NOT NULL COMMENT 'Field type',
- `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta
field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format,
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such
as yyyy-MM-dd HH:mm:ss',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end
display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not
deleted, > 0: deleted',
- `origin_node_name` varchar(256) DEFAULT '' COMMENT 'Origin Node name
which stream field belongs',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `transform_id` int(11) NOT NULL COMMENT 'Transform id',
+ `transform_type` varchar(15) NOT NULL COMMENT 'Transform type',
+ `field_name` varchar(50) NOT NULL COMMENT 'Field name',
+ `field_value` varchar(128) DEFAULT NULL COMMENT 'Field value,
required if it is a predefined field',
+ `pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field
value expression',
+ `field_type` varchar(50) NOT NULL COMMENT 'Field type',
+ `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a
meta field? 0: no, 1: yes',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format,
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such
as yyyy-MM-dd HH:mm:ss',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order
(front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete,
0: not deleted, > 0: deleted',
+ `origin_node_name` varchar(256) DEFAULT '' COMMENT 'Origin Node name
which stream field belongs',
+ `origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name
before transform operation',
PRIMARY KEY (`id`),
KEY `index_transform_id` (`transform_id`)
) ENGINE = InnoDB
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 5d8a4ee63..ed9baaa6a 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -651,21 +651,22 @@ CREATE TABLE `stream_source_field`
DROP TABLE IF EXISTS `stream_transform_field`;
CREATE TABLE `stream_transform_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `transform_id` int(11) NOT NULL COMMENT 'Transform id',
- `transform_type` varchar(15) NOT NULL COMMENT 'Transform type',
- `field_name` varchar(50) NOT NULL COMMENT 'Field name',
- `field_value` varchar(128) DEFAULT NULL COMMENT 'Field value,
required if it is a predefined field',
- `pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field
value expression',
- `field_type` varchar(50) NOT NULL COMMENT 'Field type',
- `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta
field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format,
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such
as yyyy-MM-dd HH:mm:ss',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order
(front-end display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete,
0: not deleted, > 0: deleted',
- `origin_node_name` varchar(256) DEFAULT '' COMMENT 'Origin Node name
which stream field belongs',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `transform_id` int(11) NOT NULL COMMENT 'Transform id',
+ `transform_type` varchar(15) NOT NULL COMMENT 'Transform type',
+ `field_name` varchar(50) NOT NULL COMMENT 'Field name',
+ `field_value` varchar(128) DEFAULT NULL COMMENT 'Field value,
required if it is a predefined field',
+ `pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field
value expression',
+ `field_type` varchar(50) NOT NULL COMMENT 'Field type',
+ `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a
meta field? 0: no, 1: yes',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format,
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such
as yyyy-MM-dd HH:mm:ss',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order
(front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete,
0: not deleted, > 0: deleted',
+ `origin_node_name` varchar(256) DEFAULT '' COMMENT 'Origin Node name
which stream field belongs',
+ `origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name
before transform operation',
PRIMARY KEY (`id`),
KEY `index_transform_id` (`transform_id`)
) ENGINE = InnoDB