This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eb4b9c0c26 [INLONG-11937][Sort] Allow SortHttp to filter out data in
TransformFunction (#11938)
eb4b9c0c26 is described below
commit eb4b9c0c2605ac891c67201ce63861038d31e2ef
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jul 21 11:18:08 2025 +0800
[INLONG-11937][Sort] Allow SortHttp to filter out data in TransformFunction
(#11938)
---
.../inlong/sdk/transform/decode/KvUtils.java | 2 -
.../sort/standalone/sink/BaseDecoderBuilder.java | 39 +++++
.../sort/standalone/sink/CsvDecoderBuilder.java | 53 +++++++
.../sort/standalone/sink/DecoderBuilderHolder.java | 54 +++++++
.../inlong/sort/standalone/sink/EventUtils.java | 104 +++++++++++++
...ttpRequestHandler.java => IDecoderBuilder.java} | 18 +--
.../sort/standalone/sink/KvDecoderBuilder.java | 55 +++++++
.../sort/standalone/sink/PbDecoderBuilder.java | 42 ++++++
.../inlong/sort/standalone/sink/SinkContext.java | 48 +-----
.../sink/http/DefaultEvent2HttpRequestHandler.java | 164 ++++++++++++++-------
.../sort/standalone/sink/http/HttpIdConfig.java | 2 +
.../sort/standalone/sink/http/HttpSinkContext.java | 67 +++++++++
.../sink/http/IEvent2HttpRequestHandler.java | 3 +-
13 files changed, 536 insertions(+), 115 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
index 60c804c48f..4719d7cf68 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
@@ -115,8 +115,6 @@ public class KvUtils {
state = STATE_VALUE;
break;
case STATE_VALUE:
- // throw new IllegalArgumentException("Unexpected
token " +
- // ch + " at position " + i + ".");
case STATE_ESCAPING:
stringBuilder.append(ch);
state = kvState;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/BaseDecoderBuilder.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/BaseDecoderBuilder.java
new file mode 100644
index 0000000000..fb96bffc96
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/BaseDecoderBuilder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
+public abstract class BaseDecoderBuilder implements IDecoderBuilder {
+
+ public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
+ return new FieldInfo(config.getName(),
deriveTypeConverter(config.getFormatInfo()));
+ }
+
+ public TypeConverter deriveTypeConverter(FormatInfo formatInfo) {
+
+ if (formatInfo instanceof BasicFormatInfo) {
+ return value -> ((BasicFormatInfo<?>)
formatInfo).deserialize(value);
+ }
+ return value -> value;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/CsvDecoderBuilder.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/CsvDecoderBuilder.java
new file mode 100644
index 0000000000..b42a7ec825
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/CsvDecoderBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CsvDecoderBuilder extends BaseDecoderBuilder {
+
+ @Override
+ public SourceDecoder<String> createSourceDecoder(SourceConfig
sourceConfig) {
+ List<FieldInfo> fieldInfoList = sourceConfig.getFieldConfigs()
+ .stream()
+ .map(this::convertToTransformFieldInfo)
+ .collect(Collectors.toList());
+
+ DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
+ if (dataTypeConfig instanceof CsvConfig) {
+ CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+ CsvSourceInfo csvSourceInfo = CsvSourceInfo.builder()
+ .delimiter(csvConfig.getDelimiter())
+ .escapeChar(csvConfig.getEscapeChar())
+ .fields(fieldInfoList)
+ .charset(sourceConfig.getEncodingType())
+ .build();
+ return SourceDecoderFactory.createCsvDecoder(csvSourceInfo);
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/DecoderBuilderHolder.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/DecoderBuilderHolder.java
new file mode 100644
index 0000000000..9edf878059
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/DecoderBuilderHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DecoderBuilderHolder
+ *
+ */
+public class DecoderBuilderHolder {
+
+ private static final Map<String, IDecoderBuilder> builderMap = new
ConcurrentHashMap<>();
+ static {
+ builderMap.put(CsvConfig.class.getSimpleName(), new
CsvDecoderBuilder());
+ builderMap.put(KvConfig.class.getSimpleName(), new KvDecoderBuilder());
+ builderMap.put(PbConfig.class.getSimpleName(), new PbDecoderBuilder());
+ }
+
+ public static IDecoderBuilder getBuilder(String dataTypeConfig) {
+ IDecoderBuilder builder = builderMap.get(dataTypeConfig);
+ if (builder != null) {
+ return builder;
+ }
+ throw new IllegalArgumentException("do not support data type=" +
dataTypeConfig);
+ }
+
+ public static void setBuilder(String dataTypeConfig, IDecoderBuilder
builder) {
+ if (builder == null) {
+ return;
+ }
+ builderMap.put(dataTypeConfig, builder);
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/EventUtils.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/EventUtils.java
new file mode 100644
index 0000000000..a320490b49
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/EventUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sort.formats.util.StringUtils;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.sink.http.HttpIdConfig;
+import org.apache.inlong.sort.standalone.sink.http.HttpSinkContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * EventUtils
+ */
+public class EventUtils {
+
+ public static final String KEY_FTIME = "ftime";
+ public static final String KEY_EXTINFO = "extinfo";
+
+ public static List<Map<String, Object>> decodeTransform(HttpSinkContext
context, ProfileEvent event,
+ TransformProcessor<String, Map<String, Object>> processor) {
+ // build
+ Map<String, Object> extParams = new ConcurrentHashMap<>();
+ extParams.putAll(context.getSinkContext().getParameters());
+ event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+ List<Map<String, Object>> fieldMaps =
processor.transformForBytes(event.getBody(), extParams);
+ return fieldMaps;
+ }
+
+ public static List<Map<String, String>> decodeKv(HttpSinkContext context,
ProfileEvent event, HttpIdConfig idConfig,
+ KvConfig kvConfig, String strContent) {
+ List<Map<String, String>> fieldMaps = StringUtils.splitKv(strContent,
kvConfig.getEntrySplitter(),
+ kvConfig.getKvSplitter(), kvConfig.getEscapeChar(), null,
kvConfig.getLineSeparator(), true);
+ return fieldMaps;
+ }
+
+ public static List<Map<String, String>> decodeCsv(HttpSinkContext context,
ProfileEvent event,
+ HttpIdConfig idConfig, CsvConfig csvConfig, String strContent) {
+ String[][] columns = StringUtils.splitCsv(strContent,
+ csvConfig.getDelimiter(),
+ csvConfig.getEscapeChar(),
+ null, '\n', false, true, null);
+ List<Map<String, String>> fieldMaps = new ArrayList<>();
+ for (int i = 0; i < columns.length; i++) {
+ String[] columnValues = columns[i];
+ // unescape
+ int valueLength = columnValues.length;
+ List<String> fieldList = idConfig.getFieldList();
+ // get field value
+ Map<String, String> fieldMap = new HashMap<>();
+ for (int columnIndex = 0; columnIndex < fieldList.size();
columnIndex++) {
+ String fieldName = fieldList.get(columnIndex);
+ String fieldValue = columnIndex < valueLength ?
columnValues[columnIndex] : "";
+ fieldMap.put(fieldName, fieldValue);
+ }
+ fieldMaps.add(fieldMap);
+ }
+ return fieldMaps;
+ }
+
+ public static String prepareStringContent(HttpIdConfig idConfig,
ProfileEvent event) {
+ byte[] bodyBytes = event.getBody();
+ String strContent = new String(bodyBytes, idConfig.getSourceCharset());
+ return strContent;
+ }
+
+ /**
+ * getExtInfo
+ *
+ * @param event
+ * @return
+ */
+ public static String getExtInfo(ProfileEvent event) {
+ String extinfoValue = event.getHeaders().get(KEY_EXTINFO);
+ if (extinfoValue != null) {
+ return KEY_EXTINFO + "=" + extinfoValue;
+ }
+ extinfoValue = KEY_EXTINFO + "=" +
event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
+ return extinfoValue;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/IDecoderBuilder.java
similarity index 56%
copy from
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
copy to
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/IDecoderBuilder.java
index 0268c40821..0e04df8a86 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/IDecoderBuilder.java
@@ -15,20 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.sink.http;
+package org.apache.inlong.sort.standalone.sink;
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
-import com.fasterxml.jackson.core.JsonProcessingException;
+public interface IDecoderBuilder {
-import java.net.URISyntaxException;
-import java.util.List;
-
-public interface IEvent2HttpRequestHandler {
-
- HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws
URISyntaxException, JsonProcessingException;
-
- List<HttpRequest> parse(HttpSinkContext context, DispatchProfile
dispatchProfile)
- throws URISyntaxException, JsonProcessingException;
+ SourceDecoder<String> createSourceDecoder(SourceConfig sourceConfig);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/KvDecoderBuilder.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/KvDecoderBuilder.java
new file mode 100644
index 0000000000..d47c97e770
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/KvDecoderBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class KvDecoderBuilder extends BaseDecoderBuilder {
+
+ @Override
+ public SourceDecoder<String> createSourceDecoder(SourceConfig
sourceConfig) {
+ List<FieldInfo> fieldInfoList = sourceConfig.getFieldConfigs()
+ .stream()
+ .map(this::convertToTransformFieldInfo)
+ .collect(Collectors.toList());
+
+ DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
+ if (dataTypeConfig instanceof KvConfig) {
+ KvConfig kvConfig = (KvConfig) dataTypeConfig;
+ KvSourceInfo kvSourceInfo = KvSourceInfo.builder()
+ .charset(sourceConfig.getEncodingType())
+ .fields(fieldInfoList)
+ .kvDelimiter(kvConfig.getKvSplitter())
+ .entryDelimiter(kvConfig.getEntrySplitter())
+ .lineDelimiter(kvConfig.getLineSeparator())
+ .escapeChar(kvConfig.getEscapeChar())
+ .build();
+ return SourceDecoderFactory.createKvDecoder(kvSourceInfo);
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/PbDecoderBuilder.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/PbDecoderBuilder.java
new file mode 100644
index 0000000000..dbbba87468
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/PbDecoderBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+
+public class PbDecoderBuilder extends BaseDecoderBuilder {
+
+ @Override
+ public SourceDecoder<String> createSourceDecoder(SourceConfig
sourceConfig) {
+ DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
+ if (dataTypeConfig instanceof PbConfig) {
+ PbConfig pbConfig = (PbConfig) dataTypeConfig;
+ PbSourceInfo pbSourceInfo = new
PbSourceInfo(sourceConfig.getEncodingType(),
+ pbConfig.getProtoDescription(),
+ pbConfig.getRootMessageType(),
+ pbConfig.getRowsNodePath());
+ return SourceDecoderFactory.createPbDecoder(pbSourceInfo);
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 8ee86a1ec4..f77b2a7a50 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -21,20 +21,13 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
-import org.apache.inlong.common.pojo.sort.dataflow.dataType.PbConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
-import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
-import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
@@ -54,11 +47,9 @@ import org.slf4j.Logger;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.stream.Collectors;
public class SinkContext {
@@ -212,42 +203,13 @@ public class SinkContext {
public SourceDecoder<String> createSourceDecoder(SourceConfig
sourceConfig) {
DataTypeConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
- List<FieldInfo> fieldInfoList = sourceConfig.getFieldConfigs()
- .stream()
- .map(this::convertToTransformFieldInfo)
- .collect(Collectors.toList());
-
- if (dataTypeConfig instanceof CsvConfig) {
- CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
- CsvSourceInfo csvSourceInfo = CsvSourceInfo.builder()
- .delimiter(csvConfig.getDelimiter())
- .escapeChar(csvConfig.getEscapeChar())
- .fields(fieldInfoList)
- .charset(sourceConfig.getEncodingType())
- .build();
- return SourceDecoderFactory.createCsvDecoder(csvSourceInfo);
-
- } else if (dataTypeConfig instanceof KvConfig) {
- KvConfig kvConfig = (KvConfig) dataTypeConfig;
- KvSourceInfo kvSourceInfo = KvSourceInfo.builder()
- .charset(sourceConfig.getEncodingType())
- .fields(fieldInfoList)
- .kvDelimiter(kvConfig.getKvSplitter())
- .entryDelimiter(kvConfig.getEntrySplitter())
- .lineDelimiter(kvConfig.getLineSeparator())
- .escapeChar(kvConfig.getEscapeChar())
- .build();
- return SourceDecoderFactory.createKvDecoder(kvSourceInfo);
- } else if (dataTypeConfig instanceof PbConfig) {
- PbConfig pbConfig = (PbConfig) dataTypeConfig;
- PbSourceInfo pbSourceInfo = new
PbSourceInfo(sourceConfig.getEncodingType(),
- pbConfig.getProtoDescription(),
- pbConfig.getRootMessageType(),
- pbConfig.getRowsNodePath());
- return SourceDecoderFactory.createPbDecoder(pbSourceInfo);
- } else {
+ String dataTypeClass = dataTypeConfig.getClass().getSimpleName();
+ IDecoderBuilder builder =
DecoderBuilderHolder.getBuilder(dataTypeClass);
+ SourceDecoder<String> decoder =
builder.createSourceDecoder(sourceConfig);
+ if (decoder == null) {
throw new IllegalArgumentException("do not support data type=" +
dataTypeConfig.getClass().getName());
}
+ return decoder;
}
public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
index 0a3b90d924..c4b004f261 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java
@@ -17,10 +17,14 @@
package org.apache.inlong.sort.standalone.sink.http;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
+import org.apache.inlong.sort.standalone.sink.EventUtils;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,15 +37,16 @@ import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandler {
@@ -52,32 +57,20 @@ public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandle
public static final String INLONG_GROUP_ID_HEADER = "inlongGroupId";
public static final String INLONG_STREAM_ID_HEADER = "inlongStreamId";
+ public static final String KEY_FTIME = "ftime";
+ public static final String KEY_EXTINFO = "extinfo";
+
+ private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
+
@Override
- public HttpRequest parse(HttpSinkContext context, ProfileEvent event)
+ public List<HttpRequest> parse(HttpSinkContext context, ProfileEvent event)
throws URISyntaxException, JsonProcessingException {
String uid = event.getUid();
HttpIdConfig idConfig = context.getIdConfig(uid);
if (idConfig == null) {
return null;
}
- // get the delimiter
- String delimiter = idConfig.getSeparator();
- char cDelimiter = delimiter.charAt(0);
- // for tab separator
- byte[] bodyBytes = event.getBody();
- String strContext = new String(bodyBytes, idConfig.getSourceCharset());
- // unescape
- List<String> columnValues = UnescapeHelper.toFiledList(strContext,
cDelimiter);
- int valueLength = columnValues.size();
- List<String> fieldList = idConfig.getFieldList();
- int columnLength = fieldList.size();
- // get field value
- Map<String, String> fieldMap = new HashMap<>();
- for (int i = 0; i < columnLength; ++i) {
- String fieldName = fieldList.get(i);
- String fieldValue = i < valueLength ? columnValues.get(i) : "";
- fieldMap.put(fieldName, fieldValue);
- }
+ List<Map<String, Object>> fieldMaps = decodeEvent(context, event,
idConfig);
// build
String uriString = context.getBaseUrl() + idConfig.getPath();
@@ -86,52 +79,110 @@ public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandle
}
URI uri = new URI(uriString);
String jsonData;
- HttpUriRequest request;
String requestMethod = idConfig.getMethod().toUpperCase();
+ List<HttpRequest> requests = new ArrayList<>();
switch (requestMethod) {
case "GET":
- String params = fieldMap.entrySet().stream()
- .map(entry -> {
- try {
- return entry.getKey() + "="
- + URLEncoder.encode(entry.getValue(),
idConfig.getSinkCharset().name());
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- })
- .collect(Collectors.joining("&"));
- request = new HttpGet(uri + "?" + params);
- for (Map.Entry<String, String> entry :
idConfig.getHeaders().entrySet()) {
- request.setHeader(entry.getKey(), entry.getValue());
+ for (Map<String, Object> fieldMap : fieldMaps) {
+ List<String> columnPairs = new
ArrayList<>(fieldMap.size());
+ for (Entry<String, Object> entry : fieldMap.entrySet()) {
+ String key = entry.getKey();
+ String value = "";
+ try {
+ value =
URLEncoder.encode(String.valueOf(entry.getValue()),
+ idConfig.getSinkCharset().name());
+ } catch (Throwable t) {
+ }
+ columnPairs.add(String.join("=", key, value));
+ }
+ String params = String.join("&", columnPairs);
+ HttpUriRequest request = new HttpGet(uri + "?" + params);
+ for (Map.Entry<String, String> entry :
idConfig.getHeaders().entrySet()) {
+ request.setHeader(entry.getKey(), entry.getValue());
+ }
+ request.setHeader(INLONG_GROUP_ID_HEADER,
idConfig.getInlongGroupId());
+ request.setHeader(INLONG_STREAM_ID_HEADER,
idConfig.getInlongStreamId());
+ requests.add(new HttpRequest(request, event, null,
idConfig.getMaxRetryTimes()));
}
- request.setHeader(INLONG_GROUP_ID_HEADER,
idConfig.getInlongGroupId());
- request.setHeader(INLONG_STREAM_ID_HEADER,
idConfig.getInlongStreamId());
break;
case "POST":
- request = new HttpPost(uri);
- for (Map.Entry<String, String> entry :
idConfig.getHeaders().entrySet()) {
- request.setHeader(entry.getKey(), entry.getValue());
+ for (Map<String, Object> fieldMap : fieldMaps) {
+ HttpPost request = new HttpPost(uri);
+ for (Map.Entry<String, String> entry :
idConfig.getHeaders().entrySet()) {
+ request.setHeader(entry.getKey(), entry.getValue());
+ }
+ request.setHeader(INLONG_GROUP_ID_HEADER,
idConfig.getInlongGroupId());
+ request.setHeader(INLONG_STREAM_ID_HEADER,
idConfig.getInlongStreamId());
+ jsonData = objectMapper.writeValueAsString(fieldMap);
+ setEntity((HttpEntityEnclosingRequestBase) request,
jsonData);
+ requests.add(new HttpRequest(request, event, null,
idConfig.getMaxRetryTimes()));
}
- request.setHeader(INLONG_GROUP_ID_HEADER,
idConfig.getInlongGroupId());
- request.setHeader(INLONG_STREAM_ID_HEADER,
idConfig.getInlongStreamId());
- jsonData = objectMapper.writeValueAsString(fieldMap);
- setEntity((HttpEntityEnclosingRequestBase) request, jsonData);
break;
case "PUT":
- request = new HttpPut(uri);
- for (Map.Entry<String, String> entry :
idConfig.getHeaders().entrySet()) {
- request.setHeader(entry.getKey(), entry.getValue());
+ for (Map<String, Object> fieldMap : fieldMaps) {
+ HttpPut request = new HttpPut(uri);
+ for (Map.Entry<String, String> entry :
idConfig.getHeaders().entrySet()) {
+ request.setHeader(entry.getKey(), entry.getValue());
+ }
+ request.setHeader(INLONG_GROUP_ID_HEADER,
idConfig.getInlongGroupId());
+ request.setHeader(INLONG_STREAM_ID_HEADER,
idConfig.getInlongStreamId());
+ jsonData = objectMapper.writeValueAsString(fieldMap);
+ setEntity((HttpEntityEnclosingRequestBase) request,
jsonData);
+ requests.add(new HttpRequest(request, event, null,
idConfig.getMaxRetryTimes()));
}
- request.setHeader(INLONG_GROUP_ID_HEADER,
idConfig.getInlongGroupId());
- request.setHeader(INLONG_STREAM_ID_HEADER,
idConfig.getInlongStreamId());
- jsonData = objectMapper.writeValueAsString(fieldMap);
- setEntity((HttpEntityEnclosingRequestBase) request, jsonData);
break;
default:
LOG.error("Unsupported request method: {}", requestMethod);
return null;
}
- return new HttpRequest(request, event, null,
idConfig.getMaxRetryTimes());
+ return requests;
+ }
+
+ private List<Map<String, Object>> decodeEvent(HttpSinkContext context,
ProfileEvent event, HttpIdConfig idConfig) {
+ TransformProcessor<String, Map<String, Object>> processor =
context.getTransformProcessor(event.getUid());
+ if (processor != null) {
+ return EventUtils.decodeTransform(context, event, processor);
+ }
+ // parse fields
+ String strContent = EventUtils.prepareStringContent(idConfig, event);
+ List<Map<String, String>> fieldMaps = null;
+ if (idConfig.getDataTypeConfig() == null) {
+ CsvConfig csvConfig = new CsvConfig();
+ csvConfig.setDelimiter(idConfig.getSeparator().charAt(0));
+ csvConfig.setEscapeChar('\\');
+ fieldMaps = EventUtils.decodeCsv(context, event, idConfig,
csvConfig, strContent);
+ } else {
+ DataTypeConfig dataTypeConfig = idConfig.getDataTypeConfig();
+ if (dataTypeConfig instanceof CsvConfig) {
+ CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+ fieldMaps = EventUtils.decodeCsv(context, event, idConfig,
csvConfig, strContent);
+ } else if (dataTypeConfig instanceof KvConfig) {
+ KvConfig kvConfig = (KvConfig) dataTypeConfig;
+ fieldMaps = EventUtils.decodeKv(context, event, idConfig,
kvConfig, strContent);
+ } else {
+ CsvConfig csvConfig = new CsvConfig();
+ csvConfig.setDelimiter(idConfig.getSeparator().charAt(0));
+ csvConfig.setEscapeChar('\\');
+ fieldMaps = EventUtils.decodeCsv(context, event, idConfig,
csvConfig, strContent);
+ }
+ }
+ // ftime
+ String ftime = dateFormat.format(new Date(event.getRawLogTime()));
+ // extinfo
+ String extinfo = EventUtils.getExtInfo(event);
+ List<Map<String, Object>> results = new ArrayList<>();
+ for (Map<String, String> fieldMap : fieldMaps) {
+ Map<String, Object> result = new ConcurrentHashMap<>();
+ result.putAll(fieldMap);
+ if (!result.containsKey(KEY_FTIME)) {
+ result.put(KEY_FTIME, ftime);
+ }
+
+ if (!result.containsKey(KEY_EXTINFO)) {
+ result.put(KEY_EXTINFO, extinfo);
+ }
+ }
+ return results;
}
private static void setEntity(HttpEntityEnclosingRequestBase request,
String jsonData) {
@@ -139,12 +190,13 @@ public class DefaultEvent2HttpRequestHandler implements
IEvent2HttpRequestHandle
request.setEntity(requestEntity);
}
+ @Override
public List<HttpRequest> parse(HttpSinkContext context, DispatchProfile
dispatchProfile)
throws URISyntaxException, JsonProcessingException {
List<HttpRequest> requests = new ArrayList<>();
for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
- HttpRequest request = this.parse(context, profileEvent);
- requests.add(request);
+ List<HttpRequest> request = this.parse(context, profileEvent);
+ requests.addAll(request);
}
return requests;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
index 2cd0e45a8d..0c762aaaad 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java
@@ -52,6 +52,7 @@ public class HttpIdConfig extends IdConfig {
private Map<String, String> headers;
private Integer maxRetryTimes;
private String separator;
+ private DataTypeConfig dataTypeConfig;
private List<String> fieldList;
private Charset sourceCharset;
private Charset sinkCharset;
@@ -96,6 +97,7 @@ public class HttpIdConfig extends IdConfig {
.fieldList(fields)
.sinkCharset(sinkCharset)
.sourceCharset(sourceCharset)
+ .dataTypeConfig(dataTypeConfig)
.dataFlowConfig(dataFlowConfig)
.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
index adc1d30089..1e9fbf16bf 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java
@@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.http;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.MapSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -38,6 +46,8 @@ import
org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import lombok.Getter;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -97,6 +107,9 @@ public class HttpSinkContext extends SinkContext {
private int maxRedirects = DEFAULT_MAX_REDIRECTS;
private int logMaxLength = DEFAULT_LOG_MAX_LENGTH;
+ @Getter
+ protected Map<String, TransformProcessor<String, Map<String, Object>>>
transformMap = new ConcurrentHashMap<>();
+
public HttpSinkContext(String sinkName, Context context, Channel channel,
BufferQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
@@ -131,8 +144,11 @@ public class HttpSinkContext extends SinkContext {
// change current config
Map<String, HttpIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig);
Map<String, HttpIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+ Map<String, TransformProcessor<String, Map<String, Object>>>
transformProcessor =
+ reloadTransform(taskConfig);
if (unifiedConfiguration) {
idConfigMap = fromTaskConfig;
+ transformMap = transformProcessor;
reloadClientsFromNodeConfig(httpNodeConfig);
} else {
idConfigMap = fromSortTaskConfig;
@@ -147,6 +163,54 @@ public class HttpSinkContext extends SinkContext {
}
}
+ private Map<String, TransformProcessor<String, Map<String, Object>>>
reloadTransform(TaskConfig taskConfig) {
+ ImmutableMap.Builder<String, TransformProcessor<String, Map<String,
Object>>> builder =
+ new ImmutableMap.Builder<>();
+
+ taskConfig.getClusterTagConfigs()
+ .stream()
+ .map(ClusterTagConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .forEach(flow -> {
+ TransformProcessor<String, Map<String, Object>>
transformProcessor =
+ createTransform(flow);
+ if (transformProcessor == null) {
+ return;
+ }
+ builder.put(InlongId.generateUid(flow.getInlongGroupId(),
flow.getInlongStreamId()),
+ transformProcessor);
+ });
+
+ return builder.build();
+ }
+
+ private TransformProcessor<String, Map<String, Object>>
createTransform(DataFlowConfig dataFlowConfig) {
+ try {
+ return TransformProcessor.create(
+ createTransformConfig(dataFlowConfig),
+ createSourceDecoder(dataFlowConfig.getSourceConfig()),
+ createHttpSinkEncoder(dataFlowConfig.getSinkConfig()));
+ } catch (Exception e) {
+ LOG.error("failed to reload transform of dataflow={}, ex={}",
dataFlowConfig.getDataflowId(),
+ e.getMessage());
+ return null;
+ }
+ }
+
+ private MapSinkEncoder createHttpSinkEncoder(SinkConfig config) {
+ if (!(config instanceof HttpSinkConfig)) {
+ throw new IllegalArgumentException("sinkInfo must be an instance
of HttpSinkInfo");
+ }
+ HttpSinkConfig sinkConfig = (HttpSinkConfig) config;
+ List<FieldInfo> fieldInfos = sinkConfig.getFieldConfigs()
+ .stream()
+ .map(conf -> new FieldInfo(conf.getName(),
deriveTypeConverter(conf.getFormatInfo())))
+ .collect(Collectors.toList());
+
+ MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(),
fieldInfos);
+ return SinkEncoderFactory.createMapEncoder(sinkInfo);
+ }
+
private Map<String, HttpIdConfig> reloadIdParamsFromTaskConfig(TaskConfig
taskConfig) {
if (taskConfig == null) {
return new HashMap<>();
@@ -401,4 +465,7 @@ public class HttpSinkContext extends SinkContext {
return null;
}
+ public TransformProcessor<String, Map<String, Object>>
getTransformProcessor(String uid) {
+ return this.transformMap.get(uid);
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
index 0268c40821..d1403e2807 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java
@@ -27,7 +27,8 @@ import java.util.List;
public interface IEvent2HttpRequestHandler {
- HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws
URISyntaxException, JsonProcessingException;
+ List<HttpRequest> parse(HttpSinkContext context, ProfileEvent event)
+ throws URISyntaxException, JsonProcessingException;
List<HttpRequest> parse(HttpSinkContext context, DispatchProfile
dispatchProfile)
throws URISyntaxException, JsonProcessingException;