This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe1653ca [INLONG-5070][Manager] Add more parameter settings support
for stream source (#5071)
3fe1653ca is described below
commit 3fe1653cab1f82b2ee33885d03ede37dcc43feee
Author: yunqingmoswu <[email protected]>
AuthorDate: Sat Jul 16 23:15:45 2022 +0800
[INLONG-5070][Manager] Add more parameter settings support for stream
source (#5071)
---
.../manager/common/pojo/source/SourceRequest.java | 5 +++
.../manager/common/pojo/source/StreamSource.java | 5 +++
.../common/pojo/source/file/FileSourceDTO.java | 5 +++
.../common/pojo/source/kafka/KafkaSourceDTO.java | 5 +++
.../pojo/source/mongodb/MongoDBSourceDTO.java | 5 +++
.../pojo/source/mysql/MySQLBinlogSourceDTO.java | 5 +++
.../common/pojo/source/oracle/OracleSourceDTO.java | 5 +++
.../source/postgresql/PostgreSQLSourceDTO.java | 5 +++
.../common/pojo/source/pulsar/PulsarSourceDTO.java | 5 +++
.../pojo/source/sqlserver/SQLServerSourceDTO.java | 5 +++
.../common/pojo/source/tubemq/TubeMQSourceDTO.java | 7 ++++-
.../service/sort/util/ExtractNodeUtils.java | 36 +++++++++++++---------
.../service/source/StreamSourceServiceTest.java | 8 ++++-
13 files changed, 84 insertions(+), 17 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 59c6859c2..657afdbe4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -28,7 +28,9 @@ import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
/**
* Stream source request
@@ -89,4 +91,7 @@ public class SourceRequest {
@ApiModelProperty("Field list, only support when inlong group in light
weight mode")
private List<StreamField> fieldList;
+ @ApiModelProperty("Other properties if needed")
+ private Map<String, Object> properties = new LinkedHashMap<>();
+
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
index 5f82fa6ea..d0c545a7b 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/StreamSource.java
@@ -29,6 +29,8 @@ import lombok.experimental.SuperBuilder;
import org.apache.inlong.manager.common.pojo.stream.StreamNode;
import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.Map;
/**
* Stream source info, including source name, agent ip, etc.
@@ -100,6 +102,9 @@ public abstract class StreamSource extends StreamNode {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
+ @ApiModelProperty("Properties for source")
+ private Map<String, Object> properties = new LinkedHashMap<>();
+
public SourceRequest genSourceRequest() {
return null;
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
index 3c9b6737f..0f1e0a478 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* File source information data transfer object
@@ -53,11 +54,15 @@ public class FileSourceDTO {
+ "Null or blank means from current timestamp")
private String timeOffset;
+ @ApiModelProperty("Properties for File")
+ private Map<String, Object> properties;
+
public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest
fileSourceRequest) {
return FileSourceDTO.builder()
.ip(fileSourceRequest.getIp())
.pattern(fileSourceRequest.getPattern())
.timeOffset(fileSourceRequest.getTimeOffset())
+ .properties(fileSourceRequest.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index e239d813f..467888c0d 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* kafka source information data transfer object.
@@ -88,6 +89,9 @@ public class KafkaSourceDTO {
@ApiModelProperty("Field needed when serializationType is csv,json,avro")
private String primaryKey;
+ @ApiModelProperty("Properties for Kafka")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -106,6 +110,7 @@ public class KafkaSourceDTO {
.ignoreParseErrors(request.isIgnoreParseErrors())
.timestampFormatStandard(request.getTimestampFormatStandard())
.primaryKey(request.getPrimaryKey())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
index 1e629755e..5b21955d3 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* MongoDB source info
@@ -58,6 +59,9 @@ public class MongoDBSourceDTO {
@ApiModelProperty("Primary key must be shared by all tables")
private String primaryKey;
+ @ApiModelProperty("Properties for MongoDB")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -69,6 +73,7 @@ public class MongoDBSourceDTO {
.password(request.getPassword())
.database(request.getDatabase())
.collection(request.getCollection())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
index 23bc07a07..ae63bad4a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* Binlog source info
@@ -118,6 +119,9 @@ public class MySQLBinlogSourceDTO {
@ApiModelProperty("Directly read binlog from the specified offset
position")
private Integer specificOffsetPos;
+ @ApiModelProperty("Properties for MySQL")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -141,6 +145,7 @@ public class MySQLBinlogSourceDTO {
.primaryKey(request.getPrimaryKey())
.specificOffsetFile(request.getSpecificOffsetFile())
.specificOffsetPos(request.getSpecificOffsetPos())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
index 02c7e29bf..d34acfbb7 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* Oracle source info
@@ -67,6 +68,9 @@ public class OracleSourceDTO {
@ApiModelProperty("Primary key must be shared by all tables")
private String primaryKey;
+ @ApiModelProperty("Properties for Oracle")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -81,6 +85,7 @@ public class OracleSourceDTO {
.tableName(request.getTableName())
.primaryKey(request.getPrimaryKey())
.scanStartupMode(request.getScanStartupMode())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
index 69b70e9b3..367a7c861 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
@@ -29,6 +29,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
import java.util.List;
+import java.util.Map;
/**
* PostgreSQL source info
@@ -68,6 +69,9 @@ public class PostgreSQLSourceDTO {
@ApiModelProperty("Primary key must be shared by all tables")
private String primaryKey;
+ @ApiModelProperty("Properties for PostgreSQL")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -82,6 +86,7 @@ public class PostgreSQLSourceDTO {
.tableNameList(request.getTableNameList())
.primaryKey(request.getPrimaryKey())
.decodingPluginName(request.getDecodingPluginName())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
index 9f943441b..4b8f292c3 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* Pulsar source information data transfer object
@@ -63,6 +64,9 @@ public class PulsarSourceDTO {
@Builder.Default
private String scanStartupMode = "earliest";
+ @ApiModelProperty("Properties for Pulsar")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -75,6 +79,7 @@ public class PulsarSourceDTO {
.topic(request.getTopic())
.primaryKey(request.getPrimaryKey())
.scanStartupMode(request.getScanStartupMode())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
index 03b6177a8..ad412c5b4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
/**
* SQLServer source info
@@ -70,6 +71,9 @@ public class SQLServerSourceDTO {
@ApiModelProperty("Primary key must be shared by all tables")
private String primaryKey;
+ @ApiModelProperty("Properties for SQLServer")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -85,6 +89,7 @@ public class SQLServerSourceDTO {
.serverTimezone(request.getServerTimezone())
.allMigration(request.isAllMigration())
.primaryKey(request.getPrimaryKey())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
index f0015a764..c45f7df0a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.Map;
import java.util.TreeSet;
/**
@@ -62,7 +63,10 @@ public class TubeMQSourceDTO {
*/
@ApiModelProperty("Tid of the TubeMQ")
private TreeSet<String> tid;
-
+
+ @ApiModelProperty("Properties for TubeMQ")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -74,6 +78,7 @@ public class TubeMQSourceDTO {
.groupId(request.getGroupId())
.sessionKey(request.getSessionKey())
.tid(request.getTid())
+ .properties(request.getProperties())
.build();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 3683b5a1e..032e7303c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.sort.util;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -96,7 +95,7 @@ public class ExtractNodeUtils {
case MONGODB:
return createExtractNode((MongoDBSource) sourceInfo);
case TUBEMQ:
- return createExtractNode((TubeMQSource)sourceInfo);
+ return createExtractNode((TubeMQSource) sourceInfo);
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s to create
extractNode", sourceType));
@@ -132,13 +131,13 @@ public class ExtractNodeUtils {
boolean incrementalSnapshotEnabled = true;
// TODO Needs to be configurable for those parameters
- Map<String, String> properties = Maps.newHashMap();
+ Map<String, String> properties =
binlogSource.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
if (binlogSource.isAllMigration()) {
// Unique properties when migrate all tables in database
incrementalSnapshotEnabled = false;
properties.put("migrate-all", "true");
}
- properties.put("append-mode", "true");
if (StringUtils.isEmpty(primaryKey)) {
incrementalSnapshotEnabled = false;
properties.put("scan.incremental.snapshot.enabled", "false");
@@ -208,12 +207,13 @@ public class ExtractNodeUtils {
}
final String primaryKey = kafkaSource.getPrimaryKey();
String groupId = kafkaSource.getGroupId();
-
+ Map<String, String> properties =
kafkaSource.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
return new KafkaExtractNode(id,
name,
fieldInfos,
null,
- Maps.newHashMap(),
+ properties,
topic,
bootstrapServers,
format,
@@ -268,12 +268,13 @@ public class ExtractNodeUtils {
final String primaryKey = pulsarSource.getPrimaryKey();
final String serviceUrl = pulsarSource.getServiceUrl();
final String adminUrl = pulsarSource.getAdminUrl();
-
+ Map<String, String> properties =
pulsarSource.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
return new PulsarExtractNode(id,
name,
fieldInfos,
null,
- Maps.newHashMap(),
+ properties,
fullTopicName,
adminUrl,
serviceUrl,
@@ -295,7 +296,9 @@ public class ExtractNodeUtils {
List<FieldInfo> fields = streamFields.stream()
.map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
- return new PostgresExtractNode(id, name, fields, null, null,
+ Map<String, String> properties =
postgreSQLSource.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
+ return new PostgresExtractNode(id, name, fields, null, properties,
postgreSQLSource.getPrimaryKey(),
postgreSQLSource.getTableNameList(),
postgreSQLSource.getHostname(), postgreSQLSource.getUsername(),
postgreSQLSource.getPassword(), postgreSQLSource.getDatabase(),
@@ -316,10 +319,10 @@ public class ExtractNodeUtils {
.map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
- Map<String, String> properties = Maps.newHashMap();
ScanStartUpMode scanStartupMode =
StringUtils.isBlank(source.getScanStartupMode())
? null : ScanStartUpMode.forName(source.getScanStartupMode());
-
+ Map<String, String> properties =
source.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
return new OracleExtractNode(
name,
name,
@@ -351,7 +354,8 @@ public class ExtractNodeUtils {
.map(fieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
.collect(Collectors.toList());
- Map<String, String> properties = Maps.newHashMap();
+ Map<String, String> properties =
source.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
return new SqlServerExtractNode(
name,
name,
@@ -382,7 +386,8 @@ public class ExtractNodeUtils {
List<FieldInfo> fieldInfos = streamFields.stream()
.map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
- Map<String, String> properties = Maps.newHashMap();
+ Map<String, String> properties =
source.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
return new MongoExtractNode(
name,
name,
@@ -409,13 +414,14 @@ public class ExtractNodeUtils {
List<FieldInfo> fieldInfos = streamFields.stream()
.map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
- Map<String, String> properties = Maps.newHashMap();
+ Map<String, String> properties =
source.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
return new TubeMQExtractNode(
name,
name,
fieldInfos,
null,
- properties,
+ properties,
source.getMasterRpc(),
source.getTopic(),
source.getSerializationType(),
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
index 056c55043..716886e87 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.source;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
@@ -28,6 +29,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.Map;
+
/**
* Stream source service test
*/
@@ -50,6 +53,9 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
String sourceName = "stream_source_service_test";
sourceInfo.setSourceName(sourceName);
sourceInfo.setSourceType(SourceType.BINLOG.getType());
+ Map<String, Object> properties = Maps.newLinkedHashMap();
+ properties.put("append-mode", "true");
+ sourceInfo.setProperties(properties);
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
}
@@ -82,7 +88,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
MySQLBinlogSourceRequest request =
CommonBeanUtils.copyProperties(binlogResponse,
MySQLBinlogSourceRequest::new);
boolean result = sourceService.update(request, GLOBAL_OPERATOR);
- Assertions.assertTrue(result);
+
Assertions.assertTrue("true".equals(binlogResponse.getProperties().get("append-mode"))
&& result);
sourceService.delete(id, GLOBAL_OPERATOR);
}