This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b38bdc0f9 [INLONG-7934][Manager] Optimize the serializationType to 
support debezium json (#7935)
b38bdc0f9 is described below

commit b38bdc0f9de7ea0eef4fbdd9bc350fd74603e214
Author: haifxu <[email protected]>
AuthorDate: Fri Apr 28 17:42:17 2023 +0800

    [INLONG-7934][Manager] Optimize the serializationType to support debezium 
json (#7935)
---
 .../pojo/source/mongodb/MongoDBSourceRequest.java        |  2 ++
 .../manager/pojo/source/oracle/OracleSourceRequest.java  |  2 ++
 .../pojo/source/postgresql/PostgreSQLSourceRequest.java  |  2 ++
 .../pojo/source/sqlserver/SQLServerSourceRequest.java    |  2 ++
 .../manager/service/source/AbstractSourceOperator.java   | 16 ++++++++++++++++
 .../service/source/kafka/KafkaSourceOperator.java        | 10 ++--------
 .../service/source/pulsar/PulsarSourceOperator.java      | 11 +++--------
 .../service/source/tubemq/TubeMQSourceOperator.java      |  9 ++-------
 8 files changed, 31 insertions(+), 23 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
index 95200c9d9..d1466ce44 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -56,6 +57,7 @@ public class MongoDBSourceRequest extends SourceRequest {
 
     public MongoDBSourceRequest() {
         this.setSourceType(SourceType.MONGODB);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
index 8cc3c1627..aacd0489b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -68,6 +69,7 @@ public class OracleSourceRequest extends SourceRequest {
 
     public OracleSourceRequest() {
         this.setSourceType(SourceType.ORACLE);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
index 854a7ef45..15136662b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -73,6 +74,7 @@ public class PostgreSQLSourceRequest extends SourceRequest {
 
     public PostgreSQLSourceRequest() {
         this.setSourceType(SourceType.POSTGRESQL);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
index 519531378..9dafc11e4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceRequest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -68,6 +69,7 @@ public class SQLServerSourceRequest extends SourceRequest {
 
     public SQLServerSourceRequest() {
         this.setSourceType(SourceType.SQLSERVER);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 3c2584c03..a6a25b5e5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
 import com.github.pagehelper.Page;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -306,4 +307,19 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         sourceFieldMapper.insertAll(entityList);
         LOGGER.debug("success to save source fields");
     }
+
+    /**
+     * If the stream source can only use one data type, return the data type 
that has been set.
+     *
+     * @param streamSource stream source
+     * @param streamDataType stream data type
+     * @return serialization type
+     */
+    protected String getSerializationType(StreamSource streamSource, String 
streamDataType) {
+        if (StringUtils.isNotBlank(streamSource.getSerializationType())) {
+            return streamSource.getSerializationType();
+        }
+
+        return DataTypeEnum.forType(streamDataType).getType();
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index bb5e5e3d9..f2a15de59 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -113,10 +113,6 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
             kafkaSource.setSourceName(streamId);
             kafkaSource.setBootstrapServers(bootstrapServers);
             kafkaSource.setTopic(streamInfo.getMqResource());
-            if (StringUtils.isNotBlank(streamInfo.getDataType())) {
-                String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
-                kafkaSource.setSerializationType(serializationType);
-            }
             String topicName = streamInfo.getMqResource();
             if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
                 // the default mq resource (stream id) is not sufficient to 
discriminate different kafka topics
@@ -131,10 +127,8 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
                     continue;
                 }
-                if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && 
StringUtils.isNotEmpty(
-                        sourceInfo.getSerializationType())) {
-                    
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
-                }
+
+                
kafkaSource.setSerializationType(getSerializationType(sourceInfo, 
streamInfo.getDataType()));
             }
 
             // if the SerializationType is still null, set it to the CSV
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 96fcf1cd0..1cb579a09 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -131,10 +131,6 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
             pulsarSource.setAdminUrl(adminUrl);
             pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
-            if (StringUtils.isNotBlank(streamInfo.getDataType())) {
-                String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
-                pulsarSource.setSerializationType(serializationType);
-            }
             
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
             pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
@@ -149,10 +145,9 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
                 if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
                     continue;
                 }
-                if (StringUtils.isEmpty(pulsarSource.getSerializationType())
-                        && 
StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
-                    
pulsarSource.setSerializationType(sourceInfo.getSerializationType());
-                }
+
+                
pulsarSource.setSerializationType(getSerializationType(sourceInfo, 
streamInfo.getDataType()));
+
                 // currently, only reuse the primary key from Kafka source
                 if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) {
                     pulsarSource.setPrimaryKey(((KafkaSource) 
sourceInfo).getPrimaryKey());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 142977e9f..2dda1f507 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.service.source.tubemq;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -110,17 +108,14 @@ public class TubeMQSourceOperator extends 
AbstractSourceOperator {
             tubeMQSource.setTopic(streamInfo.getMqResource());
             tubeMQSource.setGroupId(streamId);
             tubeMQSource.setMasterRpc(masterRpc);
-            if (StringUtils.isNotBlank(streamInfo.getDataType())) {
-                String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
-                tubeMQSource.setSerializationType(serializationType);
-            }
             tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
             for (StreamSource sourceInfo : streamSources) {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
                     continue;
                 }
-                
tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
+
+                
tubeMQSource.setSerializationType(getSerializationType(sourceInfo, 
streamInfo.getDataType()));
             }
             tubeMQSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamId, key -> 
Lists.newArrayList()).add(tubeMQSource);

Reply via email to