This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe1653ca [INLONG-5070][Manager] Add more parameter settings support 
for stream source (#5071)
3fe1653ca is described below

commit 3fe1653cab1f82b2ee33885d03ede37dcc43feee
Author: yunqingmoswu <[email protected]>
AuthorDate: Sat Jul 16 23:15:45 2022 +0800

    [INLONG-5070][Manager] Add more parameter settings support for stream 
source (#5071)
---
 .../manager/common/pojo/source/SourceRequest.java  |  5 +++
 .../manager/common/pojo/source/StreamSource.java   |  5 +++
 .../common/pojo/source/file/FileSourceDTO.java     |  5 +++
 .../common/pojo/source/kafka/KafkaSourceDTO.java   |  5 +++
 .../pojo/source/mongodb/MongoDBSourceDTO.java      |  5 +++
 .../pojo/source/mysql/MySQLBinlogSourceDTO.java    |  5 +++
 .../common/pojo/source/oracle/OracleSourceDTO.java |  5 +++
 .../source/postgresql/PostgreSQLSourceDTO.java     |  5 +++
 .../common/pojo/source/pulsar/PulsarSourceDTO.java |  5 +++
 .../pojo/source/sqlserver/SQLServerSourceDTO.java  |  5 +++
 .../common/pojo/source/tubemq/TubeMQSourceDTO.java |  7 ++++-
 .../service/sort/util/ExtractNodeUtils.java        | 36 +++++++++++++---------
 .../service/source/StreamSourceServiceTest.java    |  8 ++++-
 13 files changed, 84 insertions(+), 17 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 59c6859c2..657afdbe4 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -28,7 +28,9 @@ import org.hibernate.validator.constraints.Length;
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
 import javax.validation.constraints.Pattern;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Stream source request
@@ -89,4 +91,7 @@ public class SourceRequest {
     @ApiModelProperty("Field list, only support when inlong group in light 
weight mode")
     private List<StreamField> fieldList;
 
+    @ApiModelProperty("Other properties if needed")
+    private Map<String, Object> properties = new LinkedHashMap<>();
+
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
index 5f82fa6ea..d0c545a7b 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
@@ -29,6 +29,8 @@ import lombok.experimental.SuperBuilder;
 import org.apache.inlong.manager.common.pojo.stream.StreamNode;
 
 import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 /**
  * Stream source info, including source name, agent ip, etc.
@@ -100,6 +102,9 @@ public abstract class StreamSource extends StreamNode {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
+    @ApiModelProperty("Properties for source")
+    private Map<String, Object> properties = new LinkedHashMap<>();
+
     public SourceRequest genSourceRequest() {
         return null;
     }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
index 3c9b6737f..0f1e0a478 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * File source information data transfer object
@@ -53,11 +54,15 @@ public class FileSourceDTO {
             + "Null or blank means from current timestamp")
     private String timeOffset;
 
+    @ApiModelProperty("Properties for File")
+    private Map<String, Object> properties;
+
     public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest 
fileSourceRequest) {
         return FileSourceDTO.builder()
                 .ip(fileSourceRequest.getIp())
                 .pattern(fileSourceRequest.getPattern())
                 .timeOffset(fileSourceRequest.getTimeOffset())
+                .properties(fileSourceRequest.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index e239d813f..467888c0d 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * kafka source information data transfer object.
@@ -88,6 +89,9 @@ public class KafkaSourceDTO {
     @ApiModelProperty("Field needed when serializationType is csv,json,avro")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for Kafka")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -106,6 +110,7 @@ public class KafkaSourceDTO {
                 .ignoreParseErrors(request.isIgnoreParseErrors())
                 .timestampFormatStandard(request.getTimestampFormatStandard())
                 .primaryKey(request.getPrimaryKey())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
index 1e629755e..5b21955d3 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * MongoDB source info
@@ -58,6 +59,9 @@ public class MongoDBSourceDTO {
     @ApiModelProperty("Primary key must be shared by all tables")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for MongoDB")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -69,6 +73,7 @@ public class MongoDBSourceDTO {
                 .password(request.getPassword())
                 .database(request.getDatabase())
                 .collection(request.getCollection())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
index 23bc07a07..ae63bad4a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * Binlog source info
@@ -118,6 +119,9 @@ public class MySQLBinlogSourceDTO {
     @ApiModelProperty("Directly read binlog from the specified offset 
position")
     private Integer specificOffsetPos;
 
+    @ApiModelProperty("Properties for MySQL")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -141,6 +145,7 @@ public class MySQLBinlogSourceDTO {
                 .primaryKey(request.getPrimaryKey())
                 .specificOffsetFile(request.getSpecificOffsetFile())
                 .specificOffsetPos(request.getSpecificOffsetPos())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
index 02c7e29bf..d34acfbb7 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * Oracle source info
@@ -67,6 +68,9 @@ public class OracleSourceDTO {
     @ApiModelProperty("Primary key must be shared by all tables")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for Oracle")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -81,6 +85,7 @@ public class OracleSourceDTO {
                 .tableName(request.getTableName())
                 .primaryKey(request.getPrimaryKey())
                 .scanStartupMode(request.getScanStartupMode())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
index 69b70e9b3..367a7c861 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
+import java.util.Map;
 
 /**
  * PostgreSQL source info
@@ -68,6 +69,9 @@ public class PostgreSQLSourceDTO {
     @ApiModelProperty("Primary key must be shared by all tables")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for PostgreSQL")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -82,6 +86,7 @@ public class PostgreSQLSourceDTO {
                 .tableNameList(request.getTableNameList())
                 .primaryKey(request.getPrimaryKey())
                 .decodingPluginName(request.getDecodingPluginName())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
index 9f943441b..4b8f292c3 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * Pulsar source information data transfer object
@@ -63,6 +64,9 @@ public class PulsarSourceDTO {
     @Builder.Default
     private String scanStartupMode = "earliest";
 
+    @ApiModelProperty("Properties for Pulsar")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -75,6 +79,7 @@ public class PulsarSourceDTO {
                 .topic(request.getTopic())
                 .primaryKey(request.getPrimaryKey())
                 .scanStartupMode(request.getScanStartupMode())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
index 03b6177a8..ad412c5b4 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 
 /**
  * SQLServer source info
@@ -70,6 +71,9 @@ public class SQLServerSourceDTO {
     @ApiModelProperty("Primary key must be shared by all tables")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for SQLServer")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -85,6 +89,7 @@ public class SQLServerSourceDTO {
                 .serverTimezone(request.getServerTimezone())
                 .allMigration(request.isAllMigration())
                 .primaryKey(request.getPrimaryKey())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
index f0015a764..c45f7df0a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.Map;
 import java.util.TreeSet;
 
 /**
@@ -62,7 +63,10 @@ public class TubeMQSourceDTO {
      */
     @ApiModelProperty("Tid of the TubeMQ")
     private TreeSet<String> tid;
-    
+
+    @ApiModelProperty("Properties for TubeMQ")
+    private Map<String, Object> properties;
+
     /**
      * Get the dto instance from the request
      */
@@ -74,6 +78,7 @@ public class TubeMQSourceDTO {
                 .groupId(request.getGroupId())
                 .sessionKey(request.getSessionKey())
                 .tid(request.getTid())
+                .properties(request.getProperties())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 3683b5a1e..032e7303c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.sort.util;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -96,7 +95,7 @@ public class ExtractNodeUtils {
             case MONGODB:
                 return createExtractNode((MongoDBSource) sourceInfo);
             case TUBEMQ:
-                return createExtractNode((TubeMQSource)sourceInfo);
+                return createExtractNode((TubeMQSource) sourceInfo);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sourceType=%s to create 
extractNode", sourceType));
@@ -132,13 +131,13 @@ public class ExtractNodeUtils {
         boolean incrementalSnapshotEnabled = true;
 
         // TODO Needs to be configurable for those parameters
-        Map<String, String> properties = Maps.newHashMap();
+        Map<String, String> properties = 
binlogSource.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         if (binlogSource.isAllMigration()) {
             // Unique properties when migrate all tables in database
             incrementalSnapshotEnabled = false;
             properties.put("migrate-all", "true");
         }
-        properties.put("append-mode", "true");
         if (StringUtils.isEmpty(primaryKey)) {
             incrementalSnapshotEnabled = false;
             properties.put("scan.incremental.snapshot.enabled", "false");
@@ -208,12 +207,13 @@ public class ExtractNodeUtils {
         }
         final String primaryKey = kafkaSource.getPrimaryKey();
         String groupId = kafkaSource.getGroupId();
-
+        Map<String, String> properties = 
kafkaSource.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         return new KafkaExtractNode(id,
                 name,
                 fieldInfos,
                 null,
-                Maps.newHashMap(),
+                properties,
                 topic,
                 bootstrapServers,
                 format,
@@ -268,12 +268,13 @@ public class ExtractNodeUtils {
         final String primaryKey = pulsarSource.getPrimaryKey();
         final String serviceUrl = pulsarSource.getServiceUrl();
         final String adminUrl = pulsarSource.getAdminUrl();
-
+        Map<String, String> properties = 
pulsarSource.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         return new PulsarExtractNode(id,
                 name,
                 fieldInfos,
                 null,
-                Maps.newHashMap(),
+                properties,
                 fullTopicName,
                 adminUrl,
                 serviceUrl,
@@ -295,7 +296,9 @@ public class ExtractNodeUtils {
         List<FieldInfo> fields = streamFields.stream()
                 .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
-        return new PostgresExtractNode(id, name, fields, null, null,
+        Map<String, String> properties = 
postgreSQLSource.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
+        return new PostgresExtractNode(id, name, fields, null, properties,
                 postgreSQLSource.getPrimaryKey(), 
postgreSQLSource.getTableNameList(),
                 postgreSQLSource.getHostname(), postgreSQLSource.getUsername(),
                 postgreSQLSource.getPassword(), postgreSQLSource.getDatabase(),
@@ -316,10 +319,10 @@ public class ExtractNodeUtils {
                 .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
 
-        Map<String, String> properties = Maps.newHashMap();
         ScanStartUpMode scanStartupMode = 
StringUtils.isBlank(source.getScanStartupMode())
                 ? null : ScanStartUpMode.forName(source.getScanStartupMode());
-
+        Map<String, String> properties = 
source.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         return new OracleExtractNode(
                 name,
                 name,
@@ -351,7 +354,8 @@ public class ExtractNodeUtils {
                 .map(fieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
                 .collect(Collectors.toList());
 
-        Map<String, String> properties = Maps.newHashMap();
+        Map<String, String> properties = 
source.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         return new SqlServerExtractNode(
                 name,
                 name,
@@ -382,7 +386,8 @@ public class ExtractNodeUtils {
         List<FieldInfo> fieldInfos = streamFields.stream()
                 .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
-        Map<String, String> properties = Maps.newHashMap();
+        Map<String, String> properties = 
source.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         return new MongoExtractNode(
                 name,
                 name,
@@ -409,13 +414,14 @@ public class ExtractNodeUtils {
         List<FieldInfo> fieldInfos = streamFields.stream()
                 .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
-        Map<String, String> properties = Maps.newHashMap();
+        Map<String, String> properties = 
source.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
         return new TubeMQExtractNode(
                 name,
                 name,
                 fieldInfos,
                 null,
-                properties, 
+                properties,
                 source.getMasterRpc(),
                 source.getTopic(),
                 source.getSerializationType(),
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
index 056c55043..716886e87 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.source;
 
+import org.apache.curator.shaded.com.google.common.collect.Maps;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
@@ -28,6 +29,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.Map;
+
 /**
  * Stream source service test
  */
@@ -50,6 +53,9 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         String sourceName = "stream_source_service_test";
         sourceInfo.setSourceName(sourceName);
         sourceInfo.setSourceType(SourceType.BINLOG.getType());
+        Map<String, Object> properties = Maps.newLinkedHashMap();
+        properties.put("append-mode", "true");
+        sourceInfo.setProperties(properties);
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
     }
 
@@ -82,7 +88,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         MySQLBinlogSourceRequest request = 
CommonBeanUtils.copyProperties(binlogResponse,
                 MySQLBinlogSourceRequest::new);
         boolean result = sourceService.update(request, GLOBAL_OPERATOR);
-        Assertions.assertTrue(result);
+        
Assertions.assertTrue("true".equals(binlogResponse.getProperties().get("append-mode"))
 && result);
 
         sourceService.delete(id, GLOBAL_OPERATOR);
     }

Reply via email to