This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ba64f0ad8 [INLONG-4034][Manager] Add originFieldName in StreamField 
(#4037)
ba64f0ad8 is described below

commit ba64f0ad84cbce7ad099022facfb2477a2f8448d
Author: kipshi <[email protected]>
AuthorDate: Fri Apr 29 20:08:06 2022 +0800

    [INLONG-4034][Manager] Add originFieldName in StreamField (#4037)
---
 .../manager/common/pojo/stream/StreamField.java    | 11 +++++
 .../dao/entity/StreamTransformFieldEntity.java     |  2 +
 .../mappers/StreamTransformFieldEntityMapper.xml   | 52 ++++++++++++++--------
 .../service/sort/util/ExtractNodeUtils.java        |  6 ++-
 .../service/sort/util/FieldRelationShipUtils.java  | 19 +++++++-
 .../main/resources/sql/apache_inlong_manager.sql   | 31 ++++++-------
 .../manager-web/sql/apache_inlong_manager.sql      | 31 ++++++-------
 7 files changed, 101 insertions(+), 51 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
index e2e4b1efc..381576015 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
@@ -56,6 +56,14 @@ public class StreamField {
         this.originNodeName = originNodeName;
     }
 
+    public StreamField(int index, FieldType fieldType, String fieldName, 
String fieldComment, String fieldValue,
+            Integer isMetaField, String originNodeName, String 
originFieldName) {
+        this(index, fieldType, fieldName, fieldComment, fieldValue);
+        this.isMetaField = isMetaField;
+        this.originNodeName = originNodeName;
+        this.originFieldName = originFieldName;
+    }
+
     @ApiModelProperty("Field index")
     private Integer id;
 
@@ -81,4 +89,7 @@ public class StreamField {
     @ApiModelProperty("Origin Node name which stream field belongs")
     private String originNodeName;
 
+    @ApiModelProperty("Origin field name before transform operation")
+    private String originFieldName;
+
 }
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index d88eea123..a7d72b3e1 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -53,4 +53,6 @@ public class StreamTransformFieldEntity implements 
Serializable {
     private Integer isDeleted;
 
     private String originNodeName;
+
+    private String originFieldName;
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index c0e8fff14..2ea774647 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -36,12 +36,13 @@
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
         <result column="origin_node_name" jdbcType="VARCHAR" 
property="originNodeName"/>
+        <result column="origin_field_name" jdbcType="VARCHAR" 
property="originFieldName"/>
     </resultMap>
     <sql id="Base_Column_List">
         id
         , inlong_group_id, inlong_stream_id, transform_id, transform_type, 
field_name,
     field_value, pre_expression, field_type, field_comment, is_meta_field, 
field_format, 
-    rank_num, is_deleted, origin_node_name
+    rank_num, is_deleted, origin_node_name, origin_field_name
     </sql>
     <select id="selectByTransformId" 
resultType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
         select
@@ -77,12 +78,14 @@
                                             transform_id, transform_type, 
field_name,
                                             field_value, pre_expression, 
field_type,
                                             field_comment, is_meta_field, 
field_format,
-                                            rank_num, is_deleted, 
origin_node_name)
+                                            rank_num, is_deleted, 
origin_node_name,
+                                            origin_field_name)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, 
#{inlongStreamId,jdbcType=VARCHAR},
                 #{transformId,jdbcType=INTEGER}, 
#{transformType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
                 #{fieldValue,jdbcType=VARCHAR}, 
#{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
                 #{fieldComment,jdbcType=VARCHAR}, 
#{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
-                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER}, 
#{originNodeName,jdbcType=VARCHAR})
+                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER}, 
#{originNodeName,jdbcType=VARCHAR},
+                #{originFieldName,jdbcType=VARCHAR})
     </insert>
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
             
parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
@@ -133,6 +136,9 @@
             <if test="originNodeName != null">
                 origin_node_name,
             </if>
+            <if test="originFieldName != null">
+                origin_field_name,
+            </if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -180,6 +186,9 @@
             <if test="originNodeName != null">
                 #{originNodeName,jdbcType=VARCHAR},
             </if>
+            <if test="originFieldName != null">
+                #{originFieldName,jdbcType=VARCHAR},
+            </if>
         </trim>
     </insert>
     <update id="updateByPrimaryKeySelective"
@@ -228,25 +237,29 @@
             <if test="originNodeName != null">
                 origin_node_name = #{originNodeName,jdbcType=VARCHAR},
             </if>
+            <if test="originFieldName != null">
+                origin_field_name = #{originFieldName,jdbcType=VARCHAR},
+            </if>
         </set>
         where id = #{id,jdbcType=INTEGER}
     </update>
     <update id="updateByPrimaryKey" 
parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
         update stream_transform_field
-        set inlong_group_id  = #{inlongGroupId,jdbcType=VARCHAR},
-            inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-            transform_id     = #{transformId,jdbcType=INTEGER},
-            transform_type   = #{transformType,jdbcType=VARCHAR},
-            field_name       = #{fieldName,jdbcType=VARCHAR},
-            field_value      = #{fieldValue,jdbcType=VARCHAR},
-            pre_expression   = #{preExpression,jdbcType=VARCHAR},
-            field_type       = #{fieldType,jdbcType=VARCHAR},
-            field_comment    = #{fieldComment,jdbcType=VARCHAR},
-            is_meta_field    = #{isMetaField,jdbcType=SMALLINT},
-            field_format     = #{fieldFormat,jdbcType=VARCHAR},
-            rank_num         = #{rankNum,jdbcType=SMALLINT},
-            is_deleted       = #{isDeleted,jdbcType=INTEGER},
-            origin_node_name = #{originNodeName,jdbcType=VARCHAR}
+        set inlong_group_id   = #{inlongGroupId,jdbcType=VARCHAR},
+            inlong_stream_id  = #{inlongStreamId,jdbcType=VARCHAR},
+            transform_id      = #{transformId,jdbcType=INTEGER},
+            transform_type    = #{transformType,jdbcType=VARCHAR},
+            field_name        = #{fieldName,jdbcType=VARCHAR},
+            field_value       = #{fieldValue,jdbcType=VARCHAR},
+            pre_expression    = #{preExpression,jdbcType=VARCHAR},
+            field_type        = #{fieldType,jdbcType=VARCHAR},
+            field_comment     = #{fieldComment,jdbcType=VARCHAR},
+            is_meta_field     = #{isMetaField,jdbcType=SMALLINT},
+            field_format      = #{fieldFormat,jdbcType=VARCHAR},
+            rank_num          = #{rankNum,jdbcType=SMALLINT},
+            is_deleted        = #{isDeleted,jdbcType=INTEGER},
+            origin_node_name  = #{originNodeName,jdbcType=VARCHAR},
+            origin_field_name = #{originFieldName,jdbcType=VARCHAR}
         where id = #{id,jdbcType=INTEGER}
     </update>
 
@@ -255,7 +268,8 @@
         transform_id, transform_type, field_name,
         field_value, pre_expression, field_type,
         field_comment, is_meta_field, field_format,
-        rank_num, is_deleted, origin_node_name)
+        rank_num, is_deleted, origin_node_name,
+        origin_field_name)
         values
         <foreach collection="list" index="index" item="item" separator=",">
             (#{item.id,jdbcType=INTEGER}, 
#{item.inlongGroupId,jdbcType=VARCHAR},
@@ -267,7 +281,7 @@
             #{item.fieldComment,jdbcType=VARCHAR}, 
#{item.isMetaField,jdbcType=SMALLINT},
             #{item.fieldFormat,jdbcType=VARCHAR},
             #{item.rankNum,jdbcType=SMALLINT}, 
#{item.isDeleted,jdbcType=INTEGER},
-            #{item.originNodeName,jdbcType=VARCHAR})
+            #{item.originNodeName,jdbcType=VARCHAR}, 
#{item.originFieldName,jdbcType=VARCHAR})
         </foreach>
     </insert>
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 5f4681c3d..fba6fb566 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -97,6 +97,10 @@ public class ExtractNodeUtils {
                 .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
         String serverTimeZone = binlogSourceResponse.getServerTimezone();
+        boolean incrementalSnapshotEnabled = true;
+        if (binlogSourceResponse.isAllMigration()) {
+            incrementalSnapshotEnabled = false;
+        }
         return new MySqlExtractNode(id,
                 name,
                 fieldInfos,
@@ -110,7 +114,7 @@ public class ExtractNodeUtils {
                 database,
                 port,
                 serverId,
-                true,
+                incrementalSnapshotEnabled,
                 serverTimeZone);
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
index 2e344dcca..20d7310bb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
@@ -31,6 +31,7 @@ import 
org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefiniti
 import 
org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefinition.SplitRule;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.util.StreamParseUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -61,8 +62,9 @@ public class FieldRelationShipUtils {
                 return createReplacerFieldRelationShips(fieldList, 
transformName, replacerDefinition, preNodes);
             case DE_DUPLICATION:
             case FILTER:
-            case JOINER:
                 return createFieldRelationShips(fieldList, transformName);
+            case JOINER:
+                return createJoinerFieldRelationShips(fieldList, 
transformName);
             default:
                 throw new UnsupportedOperationException(
                         String.format("Unsupported transformType=%s for 
Inlong", transformType));
@@ -81,6 +83,21 @@ public class FieldRelationShipUtils {
                 }).collect(Collectors.toList());
     }
 
+    private static List<FieldRelationShip> 
createJoinerFieldRelationShips(List<StreamField> fieldList,
+            String transformName) {
+        return fieldList.stream()
+                .map(streamFieldInfo -> {
+                    FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+                            streamFieldInfo.getFieldType().name(),
+                            streamFieldInfo.getFieldFormat());
+                    FieldInfo inputField = new 
FieldInfo(streamFieldInfo.getOriginFieldName(),
+                            streamFieldInfo.getOriginNodeName(), formatInfo);
+                    FieldInfo outputField = new 
FieldInfo(streamFieldInfo.getFieldName(),
+                            transformName, formatInfo);
+                    return new FieldRelationShip(inputField, outputField);
+                }).collect(Collectors.toList());
+    }
+
     private static List<FieldRelationShip> 
createSplitterFieldRelationShips(List<StreamField> fieldList,
             String transformName, SplitterDefinition splitterDefinition, 
String preNodes) {
         Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not 
be null");
diff --git 
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index c86a00abc..d182ba63e 100644
--- 
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -621,21 +621,22 @@ CREATE TABLE `stream_source_field`
 DROP TABLE IF EXISTS `stream_transform_field`;
 CREATE TABLE `stream_transform_field`
 (
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `transform_id`     int(11)      NOT NULL COMMENT 'Transform id',
-    `transform_type`   varchar(15)  NOT NULL COMMENT 'Transform type',
-    `field_name`       varchar(50)  NOT NULL COMMENT 'Field name',
-    `field_value`      varchar(128)  DEFAULT NULL COMMENT 'Field value, 
required if it is a predefined field',
-    `pre_expression`   varchar(256)  DEFAULT NULL COMMENT 'Pre-defined field 
value expression',
-    `field_type`       varchar(50)  NOT NULL COMMENT 'Field type',
-    `field_comment`    varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `is_meta_field`    smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta 
field? 0: no, 1: yes',
-    `field_format`     varchar(50)   DEFAULT NULL COMMENT 'Field format, 
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such 
as yyyy-MM-dd HH:mm:ss',
-    `rank_num`         smallint(6) DEFAULT '0' COMMENT 'Field order (front-end 
display field order)',
-    `is_deleted`       int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not 
deleted, > 0: deleted',
-    `origin_node_name` varchar(256)  DEFAULT '' COMMENT 'Origin Node name 
which stream field belongs',
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `inlong_group_id`   varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`  varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `transform_id`      int(11)      NOT NULL COMMENT 'Transform id',
+    `transform_type`    varchar(15)  NOT NULL COMMENT 'Transform type',
+    `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
+    `field_value`       varchar(128)  DEFAULT NULL COMMENT 'Field value, 
required if it is a predefined field',
+    `pre_expression`    varchar(256)  DEFAULT NULL COMMENT 'Pre-defined field 
value expression',
+    `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
+    `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a 
meta field? 0: no, 1: yes',
+    `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, 
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such 
as yyyy-MM-dd HH:mm:ss',
+    `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order 
(front-end display field order)',
+    `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 
0: not deleted, > 0: deleted',
+    `origin_node_name`  varchar(256)  DEFAULT '' COMMENT 'Origin Node name 
which stream field belongs',
+    `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name 
before transform operation',
     PRIMARY KEY (`id`),
     KEY `index_transform_id` (`transform_id`)
 ) ENGINE = InnoDB
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 5d8a4ee63..ed9baaa6a 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -651,21 +651,22 @@ CREATE TABLE `stream_source_field`
 DROP TABLE IF EXISTS `stream_transform_field`;
 CREATE TABLE `stream_transform_field`
 (
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `transform_id`     int(11)      NOT NULL COMMENT 'Transform id',
-    `transform_type`   varchar(15)  NOT NULL COMMENT 'Transform type',
-    `field_name`       varchar(50)  NOT NULL COMMENT 'Field name',
-    `field_value`      varchar(128)  DEFAULT NULL COMMENT 'Field value, 
required if it is a predefined field',
-    `pre_expression`   varchar(256)  DEFAULT NULL COMMENT 'Pre-defined field 
value expression',
-    `field_type`       varchar(50)  NOT NULL COMMENT 'Field type',
-    `field_comment`    varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `is_meta_field`    smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta 
field? 0: no, 1: yes',
-    `field_format`     varchar(50)   DEFAULT NULL COMMENT 'Field format, 
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such 
as yyyy-MM-dd HH:mm:ss',
-    `rank_num`         smallint(6)   DEFAULT '0' COMMENT 'Field order 
(front-end display field order)',
-    `is_deleted`       int(11)       DEFAULT '0' COMMENT 'Whether to delete, 
0: not deleted, > 0: deleted',
-    `origin_node_name` varchar(256)  DEFAULT '' COMMENT 'Origin Node name 
which stream field belongs',
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `inlong_group_id`   varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`  varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `transform_id`      int(11)      NOT NULL COMMENT 'Transform id',
+    `transform_type`    varchar(15)  NOT NULL COMMENT 'Transform type',
+    `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
+    `field_value`       varchar(128)  DEFAULT NULL COMMENT 'Field value, 
required if it is a predefined field',
+    `pre_expression`    varchar(256)  DEFAULT NULL COMMENT 'Pre-defined field 
value expression',
+    `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
+    `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a 
meta field? 0: no, 1: yes',
+    `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, 
including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such 
as yyyy-MM-dd HH:mm:ss',
+    `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order 
(front-end display field order)',
+    `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 
0: not deleted, > 0: deleted',
+    `origin_node_name`  varchar(256)  DEFAULT '' COMMENT 'Origin Node name 
which stream field belongs',
+    `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name 
before transform operation',
     PRIMARY KEY (`id`),
     KEY `index_transform_id` (`transform_id`)
 ) ENGINE = InnoDB

Reply via email to