This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new de842b044f [INLONG-11911][Sort] SortStandalone supports routing 
PB-format data streams to Kafka and Pulsar (#11913)
de842b044f is described below

commit de842b044fc10ee5fbd392aa2dd55573d2047359
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 7 10:19:06 2025 +0800

    [INLONG-11911][Sort] SortStandalone supports routing PB-format data streams 
to Kafka and Pulsar (#11913)
    
    * [INLONG-11911][Sort] SortStandalone supports routing PB-format data 
streams to Kafka and Pulsar
    
    * fix code format
    
    * fix spotless
    
    * fix spotless
---
 .../common/constant/DeserializationType.java       |  1 +
 .../sort/dataflow/dataType/DataTypeConfig.java     |  1 +
 .../sort/dataflow/dataType/PbConfig.java}          | 20 +++++---
 .../service/datatype/PbDataTypeOperator.java       | 58 ++++++++++++++++++++++
 .../sdk/transform/decode/PbSourceDecoder.java      |  7 ++-
 .../sdk/transform/process/TransformProcessor.java  | 47 ++++++++++++++++++
 .../process/processor/TestAny2PbProcessor.java     |  4 +-
 .../process/processor/TestPb2CsvProcessor.java     | 20 ++++----
 .../inlong/sort/standalone/sink/SinkContext.java   |  9 ++++
 .../sink/cls/DefaultEvent2LogItemHandler.java      |  1 +
 .../DefaultEvent2IndexRequestHandler.java          |  6 ++-
 .../sort/standalone/sink/http/HttpIdConfig.java    | 16 +++++-
 .../sort/standalone/sink/kafka/KafkaIdConfig.java  | 14 ++++--
 .../standalone/sink/pulsar/PulsarIdConfig.java     | 18 +++++--
 14 files changed, 193 insertions(+), 29 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
index 31c7fb26e4..0109f2f992 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
@@ -24,4 +24,5 @@ public class DeserializationType {
     public static final String RAW = "raw";
     public static final String CSV = "csv";
     public static final String KV = "kv";
+    public static final String PB = "pb";
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
index 6f5f03fb48..c81c6a5919 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
 @JsonSubTypes({
         @JsonSubTypes.Type(value = CsvConfig.class, name = 
DeserializationType.CSV),
         @JsonSubTypes.Type(value = KvConfig.class, name = 
DeserializationType.KV),
+        @JsonSubTypes.Type(value = PbConfig.class, name = 
DeserializationType.PB),
 })
 public interface DataTypeConfig extends Serializable {
 
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/PbConfig.java
similarity index 68%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
copy to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/PbConfig.java
index 31c7fb26e4..f9e880e619 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/PbConfig.java
@@ -15,13 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.constant;
+package org.apache.inlong.common.pojo.sort.dataflow.dataType;
 
-public class DeserializationType {
+import lombok.Data;
 
-    public static final String INLONG_MSG = "inlong_msg";
-    public static final String INLONG_MSG_PB = "inlong_msg_pb";
-    public static final String RAW = "raw";
-    public static final String CSV = "csv";
-    public static final String KV = "kv";
+@Data
+public class PbConfig implements DataTypeConfig {
+
+    /**
+     * serialVersionUID long
+     */
+    private static final long serialVersionUID = -160839329020565053L;
+
+    private String protoDescription;
+    private String rootMessageType;
+    private String rowsNodePath;
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/PbDataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/PbDataTypeOperator.java
new file mode 100644
index 0000000000..19b5135f54
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/PbDataTypeOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class PbDataTypeOperator implements DataTypeOperator {
+
+    @Override
+    public boolean accept(DataTypeEnum type) {
+        return DataTypeEnum.PB.equals(type);
+    }
+
+    @Override
+    public List<FieldInfo> parseFields(String str, InlongStreamInfo 
streamInfo) throws Exception {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
+        return new PbConfig();
+    }
+
+    @Override
+    public Map<String, Object> parseTransform(InlongStreamInfo streamInfo, 
List<SinkField> fieldList,
+            String transformSql,
+            String data) {
+        return new ConcurrentHashMap<>();
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 021c625fd4..43db490232 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * PbSourceDecoder
  * 
  */
-public class PbSourceDecoder extends SourceDecoder<byte[]> {
+public class PbSourceDecoder extends SourceDecoder<String> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PbSourceDecoder.class);
 
@@ -151,4 +151,9 @@ public class PbSourceDecoder extends SourceDecoder<byte[]> {
             return null;
         }
     }
+
+    public SourceData decode(String input, Context context) {
+        byte[] srcBytes = Base64.getDecoder().decode(input);
+        return decode(srcBytes, context);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 6cfd3cd5f7..13cc17010c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -195,4 +195,51 @@ public class TransformProcessor<I, O> {
         return sinkDatas;
     }
 
+    public List<O> transformForBytes(byte[] input, Map<String, Object> 
extParams) {
+        Context context = new Context(config.getConfiguration(), extParams);
+
+        // decode
+        SourceData sourceData = this.decoder.decode(input, context);
+        if (sourceData == null) {
+            return null;
+        }
+
+        List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount());
+        for (int i = 0; i < sourceData.getRowCount(); i++) {
+
+            // where check
+            if (this.where != null && !this.where.check(sourceData, i, 
context)) {
+                continue;
+            }
+
+            // parse value
+            DefaultSinkData sinkData = new DefaultSinkData();
+            for (ValueParserNode node : this.selectItems) {
+                String fieldName = node.getFieldName();
+                ValueParser parser = node.getParser();
+                if (parser == null || StringUtils.equals(fieldName, 
SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
+                    sinkData.addField(fieldName, "");
+                    continue;
+                }
+                try {
+                    Object fieldValue = parser.parse(sourceData, i, context);
+                    if (fieldValue == null) {
+                        sinkData.addField(fieldName, "");
+                    } else {
+                        sinkData.addField(fieldName, fieldValue.toString());
+                    }
+                } catch (Throwable t) {
+                    sinkData.addField(fieldName, "");
+                }
+            }
+
+            if (this.sinkFieldList != null) {
+                sinkData.setKeyList(this.sinkFieldList);
+            }
+            // encode
+            sinkDatas.add(this.encoder.encode(sinkData, context));
+        }
+        return sinkDatas;
+    }
+
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
index 57353c3c61..1c63109fca 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
@@ -44,11 +44,11 @@ public class TestAny2PbProcessor extends 
AbstractProcessorTestBase {
         PbSinkInfo pbSink = new PbSinkInfo("UTF-8", transformBase64, fields);
         String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
         TransformConfig config = new TransformConfig(transformSql);
-        TransformProcessor<byte[], byte[]> processor = TransformProcessor
+        TransformProcessor<String, byte[]> processor = TransformProcessor
                 .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
                         SinkEncoderFactory.createPbEncoder(pbSink));
         byte[] srcBytes = this.getPbTestData();
-        List<byte[]> output = processor.transform(srcBytes);
+        List<byte[]> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
         Assert.assertEquals(2, output.size());
 
         // case1:
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
index ff652bff12..2741cb0381 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
@@ -42,11 +42,11 @@ public class TestPb2CsvProcessor extends 
AbstractProcessorTestBase {
         String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor<byte[], String> processor = TransformProcessor
+        TransformProcessor<String, String> processor = TransformProcessor
                 .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
-        List<String> output = processor.transform(srcBytes);
+        List<String> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
         Assert.assertEquals(2, output.size());
         Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4");
         Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42");
@@ -61,11 +61,11 @@ public class TestPb2CsvProcessor extends 
AbstractProcessorTestBase {
         String transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor<byte[], String> processor = TransformProcessor
+        TransformProcessor<String, String> processor = TransformProcessor
                 .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
-        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        List<String> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
         Assert.assertEquals(1, output.size());
         Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4");
     }
@@ -85,11 +85,11 @@ public class TestPb2CsvProcessor extends 
AbstractProcessorTestBase {
                 + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor<byte[], String> processor = TransformProcessor
+        TransformProcessor<String, String> processor = TransformProcessor
                 .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
-        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        List<String> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
         Assert.assertEquals(1, output.size());
         Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4");
     }
@@ -104,11 +104,11 @@ public class TestPb2CsvProcessor extends 
AbstractProcessorTestBase {
                 + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) 
msg,$root.msgs.msgTime.msg from source";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor<byte[], String> processor = TransformProcessor
+        TransformProcessor<String, String> processor = TransformProcessor
                 .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
-        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        List<String> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
         Assert.assertTrue(output.size() == 2);
         Assert.assertEquals(output.get(0), 
"sid|1|1713243918000|sid11713243918000msgValue4");
         Assert.assertEquals(output.get(1), 
"sid|1|1713243918002|sid11713243918002msgValue42");
@@ -123,11 +123,11 @@ public class TestPb2CsvProcessor extends 
AbstractProcessorTestBase {
         String transformSql = "select now() from source";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor<byte[], String> processor = TransformProcessor
+        TransformProcessor<String, String> processor = TransformProcessor
                 .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
                         SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
-        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        List<String> output = processor.transformForBytes(srcBytes, new 
HashMap<>());
         Assert.assertEquals(2, output.size());
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 9b5e4bc4dd..8ee86a1ec4 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -24,6 +24,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
@@ -33,6 +34,7 @@ import 
org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
@@ -236,6 +238,13 @@ public class SinkContext {
                     .escapeChar(kvConfig.getEscapeChar())
                     .build();
             return SourceDecoderFactory.createKvDecoder(kvSourceInfo);
+        } else if (dataTypeConfig instanceof PbConfig) {
+            PbConfig pbConfig = (PbConfig) dataTypeConfig;
+            PbSourceInfo pbSourceInfo = new 
PbSourceInfo(sourceConfig.getEncodingType(),
+                    pbConfig.getProtoDescription(),
+                    pbConfig.getRootMessageType(),
+                    pbConfig.getRowsNodePath());
+            return SourceDecoderFactory.createPbDecoder(pbSourceInfo);
         } else {
             throw new IllegalArgumentException("do not support data type=" + 
dataTypeConfig.getClass().getName());
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
index 433e3c24dd..8857204a4e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
@@ -167,6 +167,7 @@ public class DefaultEvent2LogItemHandler implements 
IEvent2LogItemHandler {
         // prepare values
         String stringValues = this.getStringValues(event, idConfig);
         Map<String, Object> extParams = new ConcurrentHashMap<>();
+        extParams.putAll(context.getSinkContext().getParameters());
         event.getHeaders().forEach((k, v) -> extParams.put(k, v));
         List<Map<String, Object>> resultList = 
processor.transform(stringValues, extParams);
         if (resultList == null) {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
index 0d91668e53..a8ab658c9e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
@@ -36,6 +36,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -157,7 +158,10 @@ public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHand
         byte[] bodyBytes = event.getBody();
         String strContext = new String(bodyBytes, idConfig.getCharset());
         // build
-        List<Map<String, Object>> esData = processor.transform(strContext);
+        Map<String, Object> extParams = new ConcurrentHashMap<>();
+        extParams.putAll(context.getSinkContext().getParameters());
+        event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+        List<Map<String, Object>> esData = processor.transform(strContext, 
extParams);
         return esData.stream()
                 .map(data -> {
                     EsIndexRequest indexRequest = new 
EsIndexRequest(indexName, event);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
index e209b511b2..2cd0e45a8d 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
@@ -18,6 +18,9 @@
 package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
@@ -42,11 +45,13 @@ import java.util.stream.Collectors;
 @Slf4j
 public class HttpIdConfig extends IdConfig {
 
+    public static final String DEFAULT_SEPARATOR = "|";
+
     private String path;
     private String method;
     private Map<String, String> headers;
     private Integer maxRetryTimes;
-    private String separator = "|";
+    private String separator;
     private List<String> fieldList;
     private Charset sourceCharset;
     private Charset sinkCharset;
@@ -73,6 +78,13 @@ public class HttpIdConfig extends IdConfig {
                     dataFlowConfig.getSourceConfig().getEncodingType(), 
dataFlowConfig.getDataflowId());
             sourceCharset = Charset.defaultCharset();
         }
+        DataTypeConfig dataTypeConfig = 
dataFlowConfig.getSourceConfig().getDataTypeConfig();
+        String separator = DEFAULT_SEPARATOR;
+        if (dataTypeConfig instanceof CsvConfig) {
+            separator = String.valueOf(((CsvConfig) 
dataTypeConfig).getDelimiter());
+        } else if (dataTypeConfig instanceof KvConfig) {
+            separator = String.valueOf(((KvConfig) 
dataTypeConfig).getEntrySplitter());
+        }
         return HttpIdConfig.builder()
                 .inlongGroupId(dataFlowConfig.getInlongGroupId())
                 .inlongStreamId(dataFlowConfig.getInlongStreamId())
@@ -80,7 +92,7 @@ public class HttpIdConfig extends IdConfig {
                 .method(sinkConfig.getMethod())
                 .headers(sinkConfig.getHeaders())
                 .maxRetryTimes(sinkConfig.getMaxRetryTimes())
-                .separator("|")
+                .separator(separator)
                 .fieldList(fields)
                 .sinkCharset(sinkCharset)
                 .sourceCharset(sourceCharset)
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index c6d29a9c6d..7ded1917f9 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -47,9 +48,10 @@ public class KafkaIdConfig extends IdConfig {
     public static final String DEFAULT_SEPARATOR = "|";
 
     private String uid;
-    private String separator = "|";
+    private String separator;
     private String topic;
-    private DataTypeEnum dataType = DataTypeEnum.TEXT;
+    private DataTypeEnum dataType;
+    private DataFlowConfig dataFlowConfig;
 
     public KafkaIdConfig(Map<String, String> idParam) {
         this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
@@ -65,10 +67,15 @@ public class KafkaIdConfig extends IdConfig {
         KafkaSinkConfig sinkConfig = (KafkaSinkConfig) 
dataFlowConfig.getSinkConfig();
         DataTypeConfig dataTypeConfig = 
dataFlowConfig.getSourceConfig().getDataTypeConfig();
         String separator = DEFAULT_SEPARATOR;
+        DataTypeEnum dataType = DataTypeEnum.TEXT;
         if (dataTypeConfig instanceof CsvConfig) {
             separator = String.valueOf(((CsvConfig) 
dataTypeConfig).getDelimiter());
+            dataType = DataTypeEnum.TEXT;
         } else if (dataTypeConfig instanceof KvConfig) {
             separator = String.valueOf(((KvConfig) 
dataTypeConfig).getEntrySplitter());
+            dataType = DataTypeEnum.TEXT;
+        } else if (dataTypeConfig instanceof PbConfig) {
+            dataType = DataTypeEnum.PB;
         }
 
         return KafkaIdConfig.builder()
@@ -76,8 +83,9 @@ public class KafkaIdConfig extends IdConfig {
                 .inlongStreamId(dataFlowConfig.getInlongStreamId())
                 .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
                 .topic(sinkConfig.getTopicName())
-                .dataType(DataTypeEnum.TEXT)
+                .dataType(dataType)
                 .separator(separator)
+                .dataFlowConfig(dataFlowConfig)
                 .build();
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 7d4f30f752..dcdb2c8aa7 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -21,6 +21,8 @@ import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -48,9 +50,10 @@ public class PulsarIdConfig extends IdConfig {
     private static final String DEFAULT_INLONG_STREAM = "1";
 
     private String uid;
-    private String separator = "|";
+    private String separator;
     private String topic;
-    private DataTypeEnum dataType = DataTypeEnum.TEXT;
+    private DataTypeEnum dataType;
+    private DataFlowConfig dataFlowConfig;
 
     public PulsarIdConfig(Map<String, String> idParam) {
         this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
@@ -66,16 +69,25 @@ public class PulsarIdConfig extends IdConfig {
         PulsarSinkConfig sinkConfig = (PulsarSinkConfig) 
dataFlowConfig.getSinkConfig();
         DataTypeConfig dataTypeConfig = 
dataFlowConfig.getSourceConfig().getDataTypeConfig();
         String separator = DEFAULT_SEPARATOR;
+        DataTypeEnum dataType = DataTypeEnum.TEXT;
         if (dataTypeConfig instanceof CsvConfig) {
             separator = String.valueOf(((CsvConfig) 
dataTypeConfig).getDelimiter());
+            dataType = DataTypeEnum.TEXT;
+        } else if (dataTypeConfig instanceof KvConfig) {
+            separator = String.valueOf(((KvConfig) 
dataTypeConfig).getEntrySplitter());
+            dataType = DataTypeEnum.TEXT;
+        } else if (dataTypeConfig instanceof PbConfig) {
+            dataType = DataTypeEnum.PB;
         }
+
         return PulsarIdConfig.builder()
                 .inlongGroupId(dataFlowConfig.getInlongGroupId())
                 .inlongStreamId(dataFlowConfig.getInlongStreamId())
                 .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
                 .topic(sinkConfig.getTopic())
-                .dataType(DataTypeEnum.TEXT)
+                .dataType(dataType)
                 .separator(separator)
+                .dataFlowConfig(dataFlowConfig)
                 .build();
 
     }

Reply via email to