This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eb4b9c0c26 [INLONG-11937][Sort] Allow SortHttp to filter out data in 
TransformFunction (#11938)
eb4b9c0c26 is described below

commit eb4b9c0c2605ac891c67201ce63861038d31e2ef
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 21 11:18:08 2025 +0800

    [INLONG-11937][Sort] Allow SortHttp to filter out data in TransformFunction 
(#11938)
---
 .../inlong/sdk/transform/decode/KvUtils.java       |   2 -
 .../sort/standalone/sink/BaseDecoderBuilder.java   |  39 +++++
 .../sort/standalone/sink/CsvDecoderBuilder.java    |  53 +++++++
 .../sort/standalone/sink/DecoderBuilderHolder.java |  54 +++++++
 .../inlong/sort/standalone/sink/EventUtils.java    | 104 +++++++++++++
 ...ttpRequestHandler.java => IDecoderBuilder.java} |  18 +--
 .../sort/standalone/sink/KvDecoderBuilder.java     |  55 +++++++
 .../sort/standalone/sink/PbDecoderBuilder.java     |  42 ++++++
 .../inlong/sort/standalone/sink/SinkContext.java   |  48 +-----
 .../sink/http/DefaultEvent2HttpRequestHandler.java | 164 ++++++++++++++-------
 .../sort/standalone/sink/http/HttpIdConfig.java    |   2 +
 .../sort/standalone/sink/http/HttpSinkContext.java |  67 +++++++++
 .../sink/http/IEvent2HttpRequestHandler.java       |   3 +-
 13 files changed, 536 insertions(+), 115 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
index 60c804c48f..4719d7cf68 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
@@ -115,8 +115,6 @@ public class KvUtils {
                         state = STATE_VALUE;
                         break;
                     case STATE_VALUE:
-                        // throw new IllegalArgumentException("Unexpected 
token " +
-                        // ch + " at position " + i + ".");
                     case STATE_ESCAPING:
                         stringBuilder.append(ch);
                         state = kvState;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/BaseDecoderBuilder.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/BaseDecoderBuilder.java
new file mode 100644
index 0000000000..fb96bffc96
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/BaseDecoderBuilder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
+public abstract class BaseDecoderBuilder implements IDecoderBuilder {
+
+    public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
+        return new FieldInfo(config.getName(), 
deriveTypeConverter(config.getFormatInfo()));
+    }
+
+    public TypeConverter deriveTypeConverter(FormatInfo formatInfo) {
+
+        if (formatInfo instanceof BasicFormatInfo) {
+            return value -> ((BasicFormatInfo<?>) 
formatInfo).deserialize(value);
+        }
+        return value -> value;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/CsvDecoderBuilder.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/CsvDecoderBuilder.java
new file mode 100644
index 0000000000..b42a7ec825
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/CsvDecoderBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CsvDecoderBuilder extends BaseDecoderBuilder {
+
+    @Override
+    public SourceDecoder<String> createSourceDecoder(SourceConfig 
sourceConfig) {
+        List<FieldInfo> fieldInfoList = sourceConfig.getFieldConfigs()
+                .stream()
+                .map(this::convertToTransformFieldInfo)
+                .collect(Collectors.toList());
+
+        DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
+        if (dataTypeConfig instanceof CsvConfig) {
+            CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+            CsvSourceInfo csvSourceInfo = CsvSourceInfo.builder()
+                    .delimiter(csvConfig.getDelimiter())
+                    .escapeChar(csvConfig.getEscapeChar())
+                    .fields(fieldInfoList)
+                    .charset(sourceConfig.getEncodingType())
+                    .build();
+            return SourceDecoderFactory.createCsvDecoder(csvSourceInfo);
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/DecoderBuilderHolder.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/DecoderBuilderHolder.java
new file mode 100644
index 0000000000..9edf878059
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/DecoderBuilderHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DecoderBuilderHolder
+ * 
+ */
+public class DecoderBuilderHolder {
+
+    private static final Map<String, IDecoderBuilder> builderMap = new 
ConcurrentHashMap<>();
+    static {
+        builderMap.put(CsvConfig.class.getSimpleName(), new 
CsvDecoderBuilder());
+        builderMap.put(KvConfig.class.getSimpleName(), new KvDecoderBuilder());
+        builderMap.put(PbConfig.class.getSimpleName(), new PbDecoderBuilder());
+    }
+
+    public static IDecoderBuilder getBuilder(String dataTypeConfig) {
+        IDecoderBuilder builder = builderMap.get(dataTypeConfig);
+        if (builder != null) {
+            return builder;
+        }
+        throw new IllegalArgumentException("do not support data type=" + 
dataTypeConfig);
+    }
+
+    public static void setBuilder(String dataTypeConfig, IDecoderBuilder 
builder) {
+        if (builder == null) {
+            return;
+        }
+        builderMap.put(dataTypeConfig, builder);
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/EventUtils.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/EventUtils.java
new file mode 100644
index 0000000000..a320490b49
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/EventUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sort.formats.util.StringUtils;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.http.HttpIdConfig;
+import org.apache.inlong.sort.standalone.sink.http.HttpSinkContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * EventUtils
+ */
+public class EventUtils {
+
+    public static final String KEY_FTIME = "ftime";
+    public static final String KEY_EXTINFO = "extinfo";
+
+    public static List<Map<String, Object>> decodeTransform(HttpSinkContext 
context, ProfileEvent event,
+            TransformProcessor<String, Map<String, Object>> processor) {
+        // build
+        Map<String, Object> extParams = new ConcurrentHashMap<>();
+        extParams.putAll(context.getSinkContext().getParameters());
+        event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+        List<Map<String, Object>> fieldMaps = 
processor.transformForBytes(event.getBody(), extParams);
+        return fieldMaps;
+    }
+
+    public static List<Map<String, String>> decodeKv(HttpSinkContext context, 
ProfileEvent event, HttpIdConfig idConfig,
+            KvConfig kvConfig, String strContent) {
+        List<Map<String, String>> fieldMaps = StringUtils.splitKv(strContent, 
kvConfig.getEntrySplitter(),
+                kvConfig.getKvSplitter(), kvConfig.getEscapeChar(), null, 
kvConfig.getLineSeparator(), true);
+        return fieldMaps;
+    }
+
+    public static List<Map<String, String>> decodeCsv(HttpSinkContext context, 
ProfileEvent event,
+            HttpIdConfig idConfig, CsvConfig csvConfig, String strContent) {
+        String[][] columns = StringUtils.splitCsv(strContent,
+                csvConfig.getDelimiter(),
+                csvConfig.getEscapeChar(),
+                null, '\n', false, true, null);
+        List<Map<String, String>> fieldMaps = new ArrayList<>();
+        for (int i = 0; i < columns.length; i++) {
+            String[] columnValues = columns[i];
+            // unescape
+            int valueLength = columnValues.length;
+            List<String> fieldList = idConfig.getFieldList();
+            // get field value
+            Map<String, String> fieldMap = new HashMap<>();
+            for (int columnIndex = 0; columnIndex < fieldList.size(); 
columnIndex++) {
+                String fieldName = fieldList.get(columnIndex);
+                String fieldValue = columnIndex < valueLength ? 
columnValues[columnIndex] : "";
+                fieldMap.put(fieldName, fieldValue);
+            }
+            fieldMaps.add(fieldMap);
+        }
+        return fieldMaps;
+    }
+
+    public static String prepareStringContent(HttpIdConfig idConfig, 
ProfileEvent event) {
+        byte[] bodyBytes = event.getBody();
+        String strContent = new String(bodyBytes, idConfig.getSourceCharset());
+        return strContent;
+    }
+
+    /**
+     * getExtInfo
+     * 
+     * @param  event
+     * @return
+     */
+    public static String getExtInfo(ProfileEvent event) {
+        String extinfoValue = event.getHeaders().get(KEY_EXTINFO);
+        if (extinfoValue != null) {
+            return KEY_EXTINFO + "=" + extinfoValue;
+        }
+        extinfoValue = KEY_EXTINFO + "=" + 
event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
+        return extinfoValue;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/IDecoderBuilder.java
similarity index 56%
copy from 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
copy to 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/IDecoderBuilder.java
index 0268c40821..0e04df8a86 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/IDecoderBuilder.java
@@ -15,20 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.sink.http;
+package org.apache.inlong.sort.standalone.sink;
 
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+public interface IDecoderBuilder {
 
-import java.net.URISyntaxException;
-import java.util.List;
-
-public interface IEvent2HttpRequestHandler {
-
-    HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws 
URISyntaxException, JsonProcessingException;
-
-    List<HttpRequest> parse(HttpSinkContext context, DispatchProfile 
dispatchProfile)
-            throws URISyntaxException, JsonProcessingException;
+    SourceDecoder<String> createSourceDecoder(SourceConfig sourceConfig);
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/KvDecoderBuilder.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/KvDecoderBuilder.java
new file mode 100644
index 0000000000..d47c97e770
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/KvDecoderBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class KvDecoderBuilder extends BaseDecoderBuilder {
+
+    @Override
+    public SourceDecoder<String> createSourceDecoder(SourceConfig 
sourceConfig) {
+        List<FieldInfo> fieldInfoList = sourceConfig.getFieldConfigs()
+                .stream()
+                .map(this::convertToTransformFieldInfo)
+                .collect(Collectors.toList());
+
+        DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
+        if (dataTypeConfig instanceof KvConfig) {
+            KvConfig kvConfig = (KvConfig) dataTypeConfig;
+            KvSourceInfo kvSourceInfo = KvSourceInfo.builder()
+                    .charset(sourceConfig.getEncodingType())
+                    .fields(fieldInfoList)
+                    .kvDelimiter(kvConfig.getKvSplitter())
+                    .entryDelimiter(kvConfig.getEntrySplitter())
+                    .lineDelimiter(kvConfig.getLineSeparator())
+                    .escapeChar(kvConfig.getEscapeChar())
+                    .build();
+            return SourceDecoderFactory.createKvDecoder(kvSourceInfo);
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/PbDecoderBuilder.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/PbDecoderBuilder.java
new file mode 100644
index 0000000000..dbbba87468
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/PbDecoderBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+
+public class PbDecoderBuilder extends BaseDecoderBuilder {
+
+    @Override
+    public SourceDecoder<String> createSourceDecoder(SourceConfig 
sourceConfig) {
+        DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
+        if (dataTypeConfig instanceof PbConfig) {
+            PbConfig pbConfig = (PbConfig) dataTypeConfig;
+            PbSourceInfo pbSourceInfo = new 
PbSourceInfo(sourceConfig.getEncodingType(),
+                    pbConfig.getProtoDescription(),
+                    pbConfig.getRootMessageType(),
+                    pbConfig.getRowsNodePath());
+            return SourceDecoderFactory.createPbDecoder(pbSourceInfo);
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 8ee86a1ec4..f77b2a7a50 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -21,20 +21,13 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sdk.transform.decode.SourceDecoder;
-import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
-import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
@@ -54,11 +47,9 @@ import org.slf4j.Logger;
 
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.stream.Collectors;
 
 public class SinkContext {
 
@@ -212,42 +203,13 @@ public class SinkContext {
 
     public SourceDecoder<String> createSourceDecoder(SourceConfig 
sourceConfig) {
         DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
-        List<FieldInfo> fieldInfoList = sourceConfig.getFieldConfigs()
-                .stream()
-                .map(this::convertToTransformFieldInfo)
-                .collect(Collectors.toList());
-
-        if (dataTypeConfig instanceof CsvConfig) {
-            CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
-            CsvSourceInfo csvSourceInfo = CsvSourceInfo.builder()
-                    .delimiter(csvConfig.getDelimiter())
-                    .escapeChar(csvConfig.getEscapeChar())
-                    .fields(fieldInfoList)
-                    .charset(sourceConfig.getEncodingType())
-                    .build();
-            return SourceDecoderFactory.createCsvDecoder(csvSourceInfo);
-
-        } else if (dataTypeConfig instanceof KvConfig) {
-            KvConfig kvConfig = (KvConfig) dataTypeConfig;
-            KvSourceInfo kvSourceInfo = KvSourceInfo.builder()
-                    .charset(sourceConfig.getEncodingType())
-                    .fields(fieldInfoList)
-                    .kvDelimiter(kvConfig.getKvSplitter())
-                    .entryDelimiter(kvConfig.getEntrySplitter())
-                    .lineDelimiter(kvConfig.getLineSeparator())
-                    .escapeChar(kvConfig.getEscapeChar())
-                    .build();
-            return SourceDecoderFactory.createKvDecoder(kvSourceInfo);
-        } else if (dataTypeConfig instanceof PbConfig) {
-            PbConfig pbConfig = (PbConfig) dataTypeConfig;
-            PbSourceInfo pbSourceInfo = new 
PbSourceInfo(sourceConfig.getEncodingType(),
-                    pbConfig.getProtoDescription(),
-                    pbConfig.getRootMessageType(),
-                    pbConfig.getRowsNodePath());
-            return SourceDecoderFactory.createPbDecoder(pbSourceInfo);
-        } else {
+        String dataTypeClass = dataTypeConfig.getClass().getSimpleName();
+        IDecoderBuilder builder = 
DecoderBuilderHolder.getBuilder(dataTypeClass);
+        SourceDecoder<String> decoder = 
builder.createSourceDecoder(sourceConfig);
+        if (decoder == null) {
             throw new IllegalArgumentException("do not support data type=" + 
dataTypeConfig.getClass().getName());
         }
+        return decoder;
     }
 
     public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
index 0a3b90d924..c4b004f261 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
@@ -17,10 +17,14 @@
 
 package org.apache.inlong.sort.standalone.sink.http;
 
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.sink.EventUtils;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,15 +37,16 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.slf4j.Logger;
 
-import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandler {
 
@@ -52,32 +57,20 @@ public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandle
     public static final String INLONG_GROUP_ID_HEADER = "inlongGroupId";
     public static final String INLONG_STREAM_ID_HEADER = "inlongStreamId";
 
+    public static final String KEY_FTIME = "ftime";
+    public static final String KEY_EXTINFO = "extinfo";
+
+    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
+
     @Override
-    public HttpRequest parse(HttpSinkContext context, ProfileEvent event)
+    public List<HttpRequest> parse(HttpSinkContext context, ProfileEvent event)
             throws URISyntaxException, JsonProcessingException {
         String uid = event.getUid();
         HttpIdConfig idConfig = context.getIdConfig(uid);
         if (idConfig == null) {
             return null;
         }
-        // get the delimiter
-        String delimiter = idConfig.getSeparator();
-        char cDelimiter = delimiter.charAt(0);
-        // for tab separator
-        byte[] bodyBytes = event.getBody();
-        String strContext = new String(bodyBytes, idConfig.getSourceCharset());
-        // unescape
-        List<String> columnValues = UnescapeHelper.toFiledList(strContext, 
cDelimiter);
-        int valueLength = columnValues.size();
-        List<String> fieldList = idConfig.getFieldList();
-        int columnLength = fieldList.size();
-        // get field value
-        Map<String, String> fieldMap = new HashMap<>();
-        for (int i = 0; i < columnLength; ++i) {
-            String fieldName = fieldList.get(i);
-            String fieldValue = i < valueLength ? columnValues.get(i) : "";
-            fieldMap.put(fieldName, fieldValue);
-        }
+        List<Map<String, Object>> fieldMaps = decodeEvent(context, event, 
idConfig);
 
         // build
         String uriString = context.getBaseUrl() + idConfig.getPath();
@@ -86,52 +79,110 @@ public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandle
         }
         URI uri = new URI(uriString);
         String jsonData;
-        HttpUriRequest request;
         String requestMethod = idConfig.getMethod().toUpperCase();
+        List<HttpRequest> requests = new ArrayList<>();
         switch (requestMethod) {
             case "GET":
-                String params = fieldMap.entrySet().stream()
-                        .map(entry -> {
-                            try {
-                                return entry.getKey() + "="
-                                        + URLEncoder.encode(entry.getValue(), 
idConfig.getSinkCharset().name());
-                            } catch (UnsupportedEncodingException e) {
-                                throw new RuntimeException(e);
-                            }
-                        })
-                        .collect(Collectors.joining("&"));
-                request = new HttpGet(uri + "?" + params);
-                for (Map.Entry<String, String> entry : 
idConfig.getHeaders().entrySet()) {
-                    request.setHeader(entry.getKey(), entry.getValue());
+                for (Map<String, Object> fieldMap : fieldMaps) {
+                    List<String> columnPairs = new 
ArrayList<>(fieldMap.size());
+                    for (Entry<String, Object> entry : fieldMap.entrySet()) {
+                        String key = entry.getKey();
+                        String value = "";
+                        try {
+                            value = 
URLEncoder.encode(String.valueOf(entry.getValue()),
+                                    idConfig.getSinkCharset().name());
+                        } catch (Throwable t) {
+                        }
+                        columnPairs.add(String.join("=", key, value));
+                    }
+                    String params = String.join("&", columnPairs);
+                    HttpUriRequest request = new HttpGet(uri + "?" + params);
+                    for (Map.Entry<String, String> entry : 
idConfig.getHeaders().entrySet()) {
+                        request.setHeader(entry.getKey(), entry.getValue());
+                    }
+                    request.setHeader(INLONG_GROUP_ID_HEADER, 
idConfig.getInlongGroupId());
+                    request.setHeader(INLONG_STREAM_ID_HEADER, 
idConfig.getInlongStreamId());
+                    requests.add(new HttpRequest(request, event, null, 
idConfig.getMaxRetryTimes()));
                 }
-                request.setHeader(INLONG_GROUP_ID_HEADER, 
idConfig.getInlongGroupId());
-                request.setHeader(INLONG_STREAM_ID_HEADER, 
idConfig.getInlongStreamId());
                 break;
             case "POST":
-                request = new HttpPost(uri);
-                for (Map.Entry<String, String> entry : 
idConfig.getHeaders().entrySet()) {
-                    request.setHeader(entry.getKey(), entry.getValue());
+                for (Map<String, Object> fieldMap : fieldMaps) {
+                    HttpPost request = new HttpPost(uri);
+                    for (Map.Entry<String, String> entry : 
idConfig.getHeaders().entrySet()) {
+                        request.setHeader(entry.getKey(), entry.getValue());
+                    }
+                    request.setHeader(INLONG_GROUP_ID_HEADER, 
idConfig.getInlongGroupId());
+                    request.setHeader(INLONG_STREAM_ID_HEADER, 
idConfig.getInlongStreamId());
+                    jsonData = objectMapper.writeValueAsString(fieldMap);
+                    setEntity((HttpEntityEnclosingRequestBase) request, 
jsonData);
+                    requests.add(new HttpRequest(request, event, null, 
idConfig.getMaxRetryTimes()));
                 }
-                request.setHeader(INLONG_GROUP_ID_HEADER, 
idConfig.getInlongGroupId());
-                request.setHeader(INLONG_STREAM_ID_HEADER, 
idConfig.getInlongStreamId());
-                jsonData = objectMapper.writeValueAsString(fieldMap);
-                setEntity((HttpEntityEnclosingRequestBase) request, jsonData);
                 break;
             case "PUT":
-                request = new HttpPut(uri);
-                for (Map.Entry<String, String> entry : 
idConfig.getHeaders().entrySet()) {
-                    request.setHeader(entry.getKey(), entry.getValue());
+                for (Map<String, Object> fieldMap : fieldMaps) {
+                    HttpPut request = new HttpPut(uri);
+                    for (Map.Entry<String, String> entry : 
idConfig.getHeaders().entrySet()) {
+                        request.setHeader(entry.getKey(), entry.getValue());
+                    }
+                    request.setHeader(INLONG_GROUP_ID_HEADER, 
idConfig.getInlongGroupId());
+                    request.setHeader(INLONG_STREAM_ID_HEADER, 
idConfig.getInlongStreamId());
+                    jsonData = objectMapper.writeValueAsString(fieldMap);
+                    setEntity((HttpEntityEnclosingRequestBase) request, 
jsonData);
+                    requests.add(new HttpRequest(request, event, null, 
idConfig.getMaxRetryTimes()));
                 }
-                request.setHeader(INLONG_GROUP_ID_HEADER, 
idConfig.getInlongGroupId());
-                request.setHeader(INLONG_STREAM_ID_HEADER, 
idConfig.getInlongStreamId());
-                jsonData = objectMapper.writeValueAsString(fieldMap);
-                setEntity((HttpEntityEnclosingRequestBase) request, jsonData);
                 break;
             default:
                 LOG.error("Unsupported request method: {}", requestMethod);
                 return null;
         }
-        return new HttpRequest(request, event, null, 
idConfig.getMaxRetryTimes());
+        return requests;
+    }
+
+    private List<Map<String, Object>> decodeEvent(HttpSinkContext context, 
ProfileEvent event, HttpIdConfig idConfig) {
+        TransformProcessor<String, Map<String, Object>> processor = 
context.getTransformProcessor(event.getUid());
+        if (processor != null) {
+            return EventUtils.decodeTransform(context, event, processor);
+        }
+        // parse fields
+        String strContent = EventUtils.prepareStringContent(idConfig, event);
+        List<Map<String, String>> fieldMaps = null;
+        if (idConfig.getDataTypeConfig() == null) {
+            CsvConfig csvConfig = new CsvConfig();
+            csvConfig.setDelimiter(idConfig.getSeparator().charAt(0));
+            csvConfig.setEscapeChar('\\');
+            fieldMaps = EventUtils.decodeCsv(context, event, idConfig, 
csvConfig, strContent);
+        } else {
+            DataTypeConfig dataTypeConfig = idConfig.getDataTypeConfig();
+            if (dataTypeConfig instanceof CsvConfig) {
+                CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+                fieldMaps = EventUtils.decodeCsv(context, event, idConfig, 
csvConfig, strContent);
+            } else if (dataTypeConfig instanceof KvConfig) {
+                KvConfig kvConfig = (KvConfig) dataTypeConfig;
+                fieldMaps = EventUtils.decodeKv(context, event, idConfig, 
kvConfig, strContent);
+            } else {
+                CsvConfig csvConfig = new CsvConfig();
+                csvConfig.setDelimiter(idConfig.getSeparator().charAt(0));
+                csvConfig.setEscapeChar('\\');
+                fieldMaps = EventUtils.decodeCsv(context, event, idConfig, 
csvConfig, strContent);
+            }
+        }
+        // ftime
+        String ftime = dateFormat.format(new Date(event.getRawLogTime()));
+        // extinfo
+        String extinfo = EventUtils.getExtInfo(event);
+        List<Map<String, Object>> results = new ArrayList<>();
+        for (Map<String, String> fieldMap : fieldMaps) {
+            Map<String, Object> result = new ConcurrentHashMap<>();
+            result.putAll(fieldMap);
+            if (!result.containsKey(KEY_FTIME)) {
+                result.put(KEY_FTIME, ftime);
+            }
+
+            if (!result.containsKey(KEY_EXTINFO)) {
+                result.put(KEY_EXTINFO, extinfo);
+            }
+        }
+        return results;
     }
 
     private static void setEntity(HttpEntityEnclosingRequestBase request, 
String jsonData) {
@@ -139,12 +190,13 @@ public class DefaultEvent2HttpRequestHandler implements 
IEvent2HttpRequestHandle
         request.setEntity(requestEntity);
     }
 
+    @Override
     public List<HttpRequest> parse(HttpSinkContext context, DispatchProfile 
dispatchProfile)
             throws URISyntaxException, JsonProcessingException {
         List<HttpRequest> requests = new ArrayList<>();
         for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
-            HttpRequest request = this.parse(context, profileEvent);
-            requests.add(request);
+            List<HttpRequest> request = this.parse(context, profileEvent);
+            requests.addAll(request);
         }
         return requests;
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
index 2cd0e45a8d..0c762aaaad 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
@@ -52,6 +52,7 @@ public class HttpIdConfig extends IdConfig {
     private Map<String, String> headers;
     private Integer maxRetryTimes;
     private String separator;
+    private DataTypeConfig dataTypeConfig;
     private List<String> fieldList;
     private Charset sourceCharset;
     private Charset sinkCharset;
@@ -96,6 +97,7 @@ public class HttpIdConfig extends IdConfig {
                 .fieldList(fields)
                 .sinkCharset(sinkCharset)
                 .sourceCharset(sourceCharset)
+                .dataTypeConfig(dataTypeConfig)
                 .dataFlowConfig(dataFlowConfig)
                 .build();
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
index adc1d30089..1e9fbf16bf 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
@@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.http;
 
 import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.MapSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -38,6 +46,8 @@ import 
org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import lombok.Getter;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -97,6 +107,9 @@ public class HttpSinkContext extends SinkContext {
     private int maxRedirects = DEFAULT_MAX_REDIRECTS;
     private int logMaxLength = DEFAULT_LOG_MAX_LENGTH;
 
+    @Getter
+    protected Map<String, TransformProcessor<String, Map<String, Object>>> 
transformMap = new ConcurrentHashMap<>();
+
     public HttpSinkContext(String sinkName, Context context, Channel channel,
             BufferQueue<DispatchProfile> dispatchQueue) {
         super(sinkName, context, channel);
@@ -131,8 +144,11 @@ public class HttpSinkContext extends SinkContext {
             // change current config
             Map<String, HttpIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig);
             Map<String, HttpIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+            Map<String, TransformProcessor<String, Map<String, Object>>> 
transformProcessor =
+                    reloadTransform(taskConfig);
             if (unifiedConfiguration) {
                 idConfigMap = fromTaskConfig;
+                transformMap = transformProcessor;
                 reloadClientsFromNodeConfig(httpNodeConfig);
             } else {
                 idConfigMap = fromSortTaskConfig;
@@ -147,6 +163,54 @@ public class HttpSinkContext extends SinkContext {
         }
     }
 
+    private Map<String, TransformProcessor<String, Map<String, Object>>> 
reloadTransform(TaskConfig taskConfig) {
+        ImmutableMap.Builder<String, TransformProcessor<String, Map<String, 
Object>>> builder =
+                new ImmutableMap.Builder<>();
+
+        taskConfig.getClusterTagConfigs()
+                .stream()
+                .map(ClusterTagConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .forEach(flow -> {
+                    TransformProcessor<String, Map<String, Object>> 
transformProcessor =
+                            createTransform(flow);
+                    if (transformProcessor == null) {
+                        return;
+                    }
+                    builder.put(InlongId.generateUid(flow.getInlongGroupId(), 
flow.getInlongStreamId()),
+                            transformProcessor);
+                });
+
+        return builder.build();
+    }
+
+    private TransformProcessor<String, Map<String, Object>> 
createTransform(DataFlowConfig dataFlowConfig) {
+        try {
+            return TransformProcessor.create(
+                    createTransformConfig(dataFlowConfig),
+                    createSourceDecoder(dataFlowConfig.getSourceConfig()),
+                    createHttpSinkEncoder(dataFlowConfig.getSinkConfig()));
+        } catch (Exception e) {
+            LOG.error("failed to reload transform of dataflow={}, ex={}", 
dataFlowConfig.getDataflowId(),
+                    e.getMessage());
+            return null;
+        }
+    }
+
+    private MapSinkEncoder createHttpSinkEncoder(SinkConfig config) {
+        if (!(config instanceof HttpSinkConfig)) {
+            throw new IllegalArgumentException("sinkInfo must be an instance 
of HttpSinkInfo");
+        }
+        HttpSinkConfig sinkConfig = (HttpSinkConfig) config;
+        List<FieldInfo> fieldInfos = sinkConfig.getFieldConfigs()
+                .stream()
+                .map(conf -> new FieldInfo(conf.getName(), 
deriveTypeConverter(conf.getFormatInfo())))
+                .collect(Collectors.toList());
+
+        MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(), 
fieldInfos);
+        return SinkEncoderFactory.createMapEncoder(sinkInfo);
+    }
+
     private Map<String, HttpIdConfig> reloadIdParamsFromTaskConfig(TaskConfig 
taskConfig) {
         if (taskConfig == null) {
             return new HashMap<>();
@@ -401,4 +465,7 @@ public class HttpSinkContext extends SinkContext {
         return null;
     }
 
+    public TransformProcessor<String, Map<String, Object>> 
getTransformProcessor(String uid) {
+        return this.transformMap.get(uid);
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
index 0268c40821..d1403e2807 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
@@ -27,7 +27,8 @@ import java.util.List;
 
 public interface IEvent2HttpRequestHandler {
 
-    HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws 
URISyntaxException, JsonProcessingException;
+    List<HttpRequest> parse(HttpSinkContext context, ProfileEvent event)
+            throws URISyntaxException, JsonProcessingException;
 
     List<HttpRequest> parse(HttpSinkContext context, DispatchProfile 
dispatchProfile)
             throws URISyntaxException, JsonProcessingException;


Reply via email to