This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new de842b044f [INLONG-11911][Sort] SortStandalone supports routing
PB-format data streams to Kafka and Pulsar (#11913)
de842b044f is described below
commit de842b044fc10ee5fbd392aa2dd55573d2047359
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 7 10:19:06 2025 +0800
[INLONG-11911][Sort] SortStandalone supports routing PB-format data streams
to Kafka and Pulsar (#11913)
* [INLONG-11911][Sort] SortStandalone supports routing PB-format data
streams to Kafka and Pulsar
* fix code format
* fix spotless
* fix spotless
---
.../common/constant/DeserializationType.java | 1 +
.../sort/dataflow/dataType/DataTypeConfig.java | 1 +
.../sort/dataflow/dataType/PbConfig.java} | 20 +++++---
.../service/datatype/PbDataTypeOperator.java | 58 ++++++++++++++++++++++
.../sdk/transform/decode/PbSourceDecoder.java | 7 ++-
.../sdk/transform/process/TransformProcessor.java | 47 ++++++++++++++++++
.../process/processor/TestAny2PbProcessor.java | 4 +-
.../process/processor/TestPb2CsvProcessor.java | 20 ++++----
.../inlong/sort/standalone/sink/SinkContext.java | 9 ++++
.../sink/cls/DefaultEvent2LogItemHandler.java | 1 +
.../DefaultEvent2IndexRequestHandler.java | 6 ++-
.../sort/standalone/sink/http/HttpIdConfig.java | 16 +++++-
.../sort/standalone/sink/kafka/KafkaIdConfig.java | 14 ++++--
.../standalone/sink/pulsar/PulsarIdConfig.java | 18 +++++--
14 files changed, 193 insertions(+), 29 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
index 31c7fb26e4..0109f2f992 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
@@ -24,4 +24,5 @@ public class DeserializationType {
public static final String RAW = "raw";
public static final String CSV = "csv";
public static final String KV = "kv";
+ public static final String PB = "pb";
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
index 6f5f03fb48..c81c6a5919 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
@JsonSubTypes({
@JsonSubTypes.Type(value = CsvConfig.class, name =
DeserializationType.CSV),
@JsonSubTypes.Type(value = KvConfig.class, name =
DeserializationType.KV),
+ @JsonSubTypes.Type(value = PbConfig.class, name =
DeserializationType.PB),
})
public interface DataTypeConfig extends Serializable {
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/PbConfig.java
similarity index 68%
copy from
inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
copy to
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/PbConfig.java
index 31c7fb26e4..f9e880e619 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/PbConfig.java
@@ -15,13 +15,19 @@
* limitations under the License.
*/
-package org.apache.inlong.common.constant;
+package org.apache.inlong.common.pojo.sort.dataflow.dataType;
-public class DeserializationType {
+import lombok.Data;
- public static final String INLONG_MSG = "inlong_msg";
- public static final String INLONG_MSG_PB = "inlong_msg_pb";
- public static final String RAW = "raw";
- public static final String CSV = "csv";
- public static final String KV = "kv";
+@Data
+public class PbConfig implements DataTypeConfig {
+
+ /**
+ * serialVersionUID long
+ */
+ private static final long serialVersionUID = -160839329020565053L;
+
+ private String protoDescription;
+ private String rootMessageType;
+ private String rowsNodePath;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/PbDataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/PbDataTypeOperator.java
new file mode 100644
index 0000000000..19b5135f54
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/PbDataTypeOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class PbDataTypeOperator implements DataTypeOperator {
+
+ @Override
+ public boolean accept(DataTypeEnum type) {
+ return DataTypeEnum.PB.equals(type);
+ }
+
+ @Override
+ public List<FieldInfo> parseFields(String str, InlongStreamInfo
streamInfo) throws Exception {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
+ return new PbConfig();
+ }
+
+ @Override
+ public Map<String, Object> parseTransform(InlongStreamInfo streamInfo,
List<SinkField> fieldList,
+ String transformSql,
+ String data) {
+ return new ConcurrentHashMap<>();
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 021c625fd4..43db490232 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
* PbSourceDecoder
*
*/
-public class PbSourceDecoder extends SourceDecoder<byte[]> {
+public class PbSourceDecoder extends SourceDecoder<String> {
private static final Logger LOG =
LoggerFactory.getLogger(PbSourceDecoder.class);
@@ -151,4 +151,9 @@ public class PbSourceDecoder extends SourceDecoder<byte[]> {
return null;
}
}
+
+ public SourceData decode(String input, Context context) {
+ byte[] srcBytes = Base64.getDecoder().decode(input);
+ return decode(srcBytes, context);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 6cfd3cd5f7..13cc17010c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -195,4 +195,51 @@ public class TransformProcessor<I, O> {
return sinkDatas;
}
+ public List<O> transformForBytes(byte[] input, Map<String, Object>
extParams) {
+ Context context = new Context(config.getConfiguration(), extParams);
+
+ // decode
+ SourceData sourceData = this.decoder.decode(input, context);
+ if (sourceData == null) {
+ return null;
+ }
+
+ List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount());
+ for (int i = 0; i < sourceData.getRowCount(); i++) {
+
+ // where check
+ if (this.where != null && !this.where.check(sourceData, i,
context)) {
+ continue;
+ }
+
+ // parse value
+ DefaultSinkData sinkData = new DefaultSinkData();
+ for (ValueParserNode node : this.selectItems) {
+ String fieldName = node.getFieldName();
+ ValueParser parser = node.getParser();
+ if (parser == null || StringUtils.equals(fieldName,
SinkEncoder.ALL_SOURCE_FIELD_SIGN)) {
+ sinkData.addField(fieldName, "");
+ continue;
+ }
+ try {
+ Object fieldValue = parser.parse(sourceData, i, context);
+ if (fieldValue == null) {
+ sinkData.addField(fieldName, "");
+ } else {
+ sinkData.addField(fieldName, fieldValue.toString());
+ }
+ } catch (Throwable t) {
+ sinkData.addField(fieldName, "");
+ }
+ }
+
+ if (this.sinkFieldList != null) {
+ sinkData.setKeyList(this.sinkFieldList);
+ }
+ // encode
+ sinkDatas.add(this.encoder.encode(sinkData, context));
+ }
+ return sinkDatas;
+ }
+
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
index 57353c3c61..1c63109fca 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAny2PbProcessor.java
@@ -44,11 +44,11 @@ public class TestAny2PbProcessor extends
AbstractProcessorTestBase {
PbSinkInfo pbSink = new PbSinkInfo("UTF-8", transformBase64, fields);
String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
TransformConfig config = new TransformConfig(transformSql);
- TransformProcessor<byte[], byte[]> processor = TransformProcessor
+ TransformProcessor<String, byte[]> processor = TransformProcessor
.create(config, SourceDecoderFactory.createPbDecoder(pbSource),
SinkEncoderFactory.createPbEncoder(pbSink));
byte[] srcBytes = this.getPbTestData();
- List<byte[]> output = processor.transform(srcBytes);
+ List<byte[]> output = processor.transformForBytes(srcBytes, new
HashMap<>());
Assert.assertEquals(2, output.size());
// case1:
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
index ff652bff12..2741cb0381 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2CsvProcessor.java
@@ -42,11 +42,11 @@ public class TestPb2CsvProcessor extends
AbstractProcessorTestBase {
String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor<byte[], String> processor = TransformProcessor
+ TransformProcessor<String, String> processor = TransformProcessor
.create(config, SourceDecoderFactory.createPbDecoder(pbSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
- List<String> output = processor.transform(srcBytes);
+ List<String> output = processor.transformForBytes(srcBytes, new
HashMap<>());
Assert.assertEquals(2, output.size());
Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4");
Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42");
@@ -61,11 +61,11 @@ public class TestPb2CsvProcessor extends
AbstractProcessorTestBase {
String transformSql = "select
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor<byte[], String> processor = TransformProcessor
+ TransformProcessor<String, String> processor = TransformProcessor
.create(config, SourceDecoderFactory.createPbDecoder(pbSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
- List<String> output = processor.transform(srcBytes, new HashMap<>());
+ List<String> output = processor.transformForBytes(srcBytes, new
HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4");
}
@@ -85,11 +85,11 @@ public class TestPb2CsvProcessor extends
AbstractProcessorTestBase {
+ "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor<byte[], String> processor = TransformProcessor
+ TransformProcessor<String, String> processor = TransformProcessor
.create(config, SourceDecoderFactory.createPbDecoder(pbSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
- List<String> output = processor.transform(srcBytes, new HashMap<>());
+ List<String> output = processor.transformForBytes(srcBytes, new
HashMap<>());
Assert.assertEquals(1, output.size());
Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4");
}
@@ -104,11 +104,11 @@ public class TestPb2CsvProcessor extends
AbstractProcessorTestBase {
+ "concat($root.sid,$root.packageID,$child.msgTime,$child.msg)
msg,$root.msgs.msgTime.msg from source";
TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor<byte[], String> processor = TransformProcessor
+ TransformProcessor<String, String> processor = TransformProcessor
.create(config, SourceDecoderFactory.createPbDecoder(pbSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
- List<String> output = processor.transform(srcBytes, new HashMap<>());
+ List<String> output = processor.transformForBytes(srcBytes, new
HashMap<>());
Assert.assertTrue(output.size() == 2);
Assert.assertEquals(output.get(0),
"sid|1|1713243918000|sid11713243918000msgValue4");
Assert.assertEquals(output.get(1),
"sid|1|1713243918002|sid11713243918002msgValue42");
@@ -123,11 +123,11 @@ public class TestPb2CsvProcessor extends
AbstractProcessorTestBase {
String transformSql = "select now() from source";
TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor<byte[], String> processor = TransformProcessor
+ TransformProcessor<String, String> processor = TransformProcessor
.create(config, SourceDecoderFactory.createPbDecoder(pbSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
- List<String> output = processor.transform(srcBytes, new HashMap<>());
+ List<String> output = processor.transformForBytes(srcBytes, new
HashMap<>());
Assert.assertEquals(2, output.size());
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 9b5e4bc4dd..8ee86a1ec4 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -24,6 +24,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
@@ -33,6 +34,7 @@ import
org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
@@ -236,6 +238,13 @@ public class SinkContext {
.escapeChar(kvConfig.getEscapeChar())
.build();
return SourceDecoderFactory.createKvDecoder(kvSourceInfo);
+ } else if (dataTypeConfig instanceof PbConfig) {
+ PbConfig pbConfig = (PbConfig) dataTypeConfig;
+ PbSourceInfo pbSourceInfo = new
PbSourceInfo(sourceConfig.getEncodingType(),
+ pbConfig.getProtoDescription(),
+ pbConfig.getRootMessageType(),
+ pbConfig.getRowsNodePath());
+ return SourceDecoderFactory.createPbDecoder(pbSourceInfo);
} else {
throw new IllegalArgumentException("do not support data type=" +
dataTypeConfig.getClass().getName());
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
index 433e3c24dd..8857204a4e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
@@ -167,6 +167,7 @@ public class DefaultEvent2LogItemHandler implements
IEvent2LogItemHandler {
// prepare values
String stringValues = this.getStringValues(event, idConfig);
Map<String, Object> extParams = new ConcurrentHashMap<>();
+ extParams.putAll(context.getSinkContext().getParameters());
event.getHeaders().forEach((k, v) -> extParams.put(k, v));
List<Map<String, Object>> resultList =
processor.transform(stringValues, extParams);
if (resultList == null) {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
index 0d91668e53..a8ab658c9e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
@@ -36,6 +36,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -157,7 +158,10 @@ public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHand
byte[] bodyBytes = event.getBody();
String strContext = new String(bodyBytes, idConfig.getCharset());
// build
- List<Map<String, Object>> esData = processor.transform(strContext);
+ Map<String, Object> extParams = new ConcurrentHashMap<>();
+ extParams.putAll(context.getSinkContext().getParameters());
+ event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+ List<Map<String, Object>> esData = processor.transform(strContext,
extParams);
return esData.stream()
.map(data -> {
EsIndexRequest indexRequest = new
EsIndexRequest(indexName, event);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
index e209b511b2..2cd0e45a8d 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
@@ -18,6 +18,9 @@
package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
@@ -42,11 +45,13 @@ import java.util.stream.Collectors;
@Slf4j
public class HttpIdConfig extends IdConfig {
+ public static final String DEFAULT_SEPARATOR = "|";
+
private String path;
private String method;
private Map<String, String> headers;
private Integer maxRetryTimes;
- private String separator = "|";
+ private String separator;
private List<String> fieldList;
private Charset sourceCharset;
private Charset sinkCharset;
@@ -73,6 +78,13 @@ public class HttpIdConfig extends IdConfig {
dataFlowConfig.getSourceConfig().getEncodingType(),
dataFlowConfig.getDataflowId());
sourceCharset = Charset.defaultCharset();
}
+ DataTypeConfig dataTypeConfig =
dataFlowConfig.getSourceConfig().getDataTypeConfig();
+ String separator = DEFAULT_SEPARATOR;
+ if (dataTypeConfig instanceof CsvConfig) {
+ separator = String.valueOf(((CsvConfig)
dataTypeConfig).getDelimiter());
+ } else if (dataTypeConfig instanceof KvConfig) {
+ separator = String.valueOf(((KvConfig)
dataTypeConfig).getEntrySplitter());
+ }
return HttpIdConfig.builder()
.inlongGroupId(dataFlowConfig.getInlongGroupId())
.inlongStreamId(dataFlowConfig.getInlongStreamId())
@@ -80,7 +92,7 @@ public class HttpIdConfig extends IdConfig {
.method(sinkConfig.getMethod())
.headers(sinkConfig.getHeaders())
.maxRetryTimes(sinkConfig.getMaxRetryTimes())
- .separator("|")
+ .separator(separator)
.fieldList(fields)
.sinkCharset(sinkCharset)
.sourceCharset(sourceCharset)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index c6d29a9c6d..7ded1917f9 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -47,9 +48,10 @@ public class KafkaIdConfig extends IdConfig {
public static final String DEFAULT_SEPARATOR = "|";
private String uid;
- private String separator = "|";
+ private String separator;
private String topic;
- private DataTypeEnum dataType = DataTypeEnum.TEXT;
+ private DataTypeEnum dataType;
+ private DataFlowConfig dataFlowConfig;
public KafkaIdConfig(Map<String, String> idParam) {
this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
@@ -65,10 +67,15 @@ public class KafkaIdConfig extends IdConfig {
KafkaSinkConfig sinkConfig = (KafkaSinkConfig)
dataFlowConfig.getSinkConfig();
DataTypeConfig dataTypeConfig =
dataFlowConfig.getSourceConfig().getDataTypeConfig();
String separator = DEFAULT_SEPARATOR;
+ DataTypeEnum dataType = DataTypeEnum.TEXT;
if (dataTypeConfig instanceof CsvConfig) {
separator = String.valueOf(((CsvConfig)
dataTypeConfig).getDelimiter());
+ dataType = DataTypeEnum.TEXT;
} else if (dataTypeConfig instanceof KvConfig) {
separator = String.valueOf(((KvConfig)
dataTypeConfig).getEntrySplitter());
+ dataType = DataTypeEnum.TEXT;
+ } else if (dataTypeConfig instanceof PbConfig) {
+ dataType = DataTypeEnum.PB;
}
return KafkaIdConfig.builder()
@@ -76,8 +83,9 @@ public class KafkaIdConfig extends IdConfig {
.inlongStreamId(dataFlowConfig.getInlongStreamId())
.uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
.topic(sinkConfig.getTopicName())
- .dataType(DataTypeEnum.TEXT)
+ .dataType(dataType)
.separator(separator)
+ .dataFlowConfig(dataFlowConfig)
.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 7d4f30f752..dcdb2c8aa7 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -21,6 +21,8 @@ import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -48,9 +50,10 @@ public class PulsarIdConfig extends IdConfig {
private static final String DEFAULT_INLONG_STREAM = "1";
private String uid;
- private String separator = "|";
+ private String separator;
private String topic;
- private DataTypeEnum dataType = DataTypeEnum.TEXT;
+ private DataTypeEnum dataType;
+ private DataFlowConfig dataFlowConfig;
public PulsarIdConfig(Map<String, String> idParam) {
this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
@@ -66,16 +69,25 @@ public class PulsarIdConfig extends IdConfig {
PulsarSinkConfig sinkConfig = (PulsarSinkConfig)
dataFlowConfig.getSinkConfig();
DataTypeConfig dataTypeConfig =
dataFlowConfig.getSourceConfig().getDataTypeConfig();
String separator = DEFAULT_SEPARATOR;
+ DataTypeEnum dataType = DataTypeEnum.TEXT;
if (dataTypeConfig instanceof CsvConfig) {
separator = String.valueOf(((CsvConfig)
dataTypeConfig).getDelimiter());
+ dataType = DataTypeEnum.TEXT;
+ } else if (dataTypeConfig instanceof KvConfig) {
+ separator = String.valueOf(((KvConfig)
dataTypeConfig).getEntrySplitter());
+ dataType = DataTypeEnum.TEXT;
+ } else if (dataTypeConfig instanceof PbConfig) {
+ dataType = DataTypeEnum.PB;
}
+
return PulsarIdConfig.builder()
.inlongGroupId(dataFlowConfig.getInlongGroupId())
.inlongStreamId(dataFlowConfig.getInlongStreamId())
.uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
.topic(sinkConfig.getTopic())
- .dataType(DataTypeEnum.TEXT)
+ .dataType(dataType)
.separator(separator)
+ .dataFlowConfig(dataFlowConfig)
.build();
}