This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5c6aabe607 [INLONG-10729][Sort] Sorstandalone EsSink support transform
(#10734)
5c6aabe607 is described below
commit 5c6aabe607c6a67f895f402de5abed42f1f97afd
Author: vernedeng <[email protected]>
AuthorDate: Sun Aug 4 10:08:59 2024 +0800
[INLONG-10729][Sort] Sorstandalone EsSink support transform (#10734)
---
.../sdk/transform/encode/MapSinkEncoder.java | 74 ++++++++++++++++++++++
.../sdk/transform/encode/SinkEncoderFactory.java | 5 ++
.../inlong/sdk/transform/pojo/CsvSinkInfo.java | 2 +-
.../inlong/sdk/transform/pojo/FieldInfo.java | 12 ++++
.../inlong/sdk/transform/pojo/KvSinkInfo.java | 2 +-
.../pojo/{KvSinkInfo.java => MapSinkInfo.java} | 39 ++----------
.../apache/inlong/sdk/transform/pojo/SinkInfo.java | 10 ++-
.../inlong/sdk/transform/pojo/TransformConfig.java | 3 +-
.../transform/process/converter/TypeConverter.java | 25 ++++----
.../inlong/sort/standalone/sink/SinkContext.java | 30 ++++++---
.../DefaultEvent2IndexRequestHandler.java | 31 +++++++++
.../sink/elasticsearch/EsChannelWorker.java | 31 ++++++---
.../standalone/sink/elasticsearch/EsIdConfig.java | 14 ++++
.../sink/elasticsearch/EsSinkContext.java | 68 ++++++++++++++++++++
.../elasticsearch/IEvent2IndexRequestHandler.java | 9 +++
.../sink/elasticsearch/TestEsSinkContext.java | 2 +
16 files changed, 292 insertions(+), 65 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
new file mode 100644
index 0000000000..139bfa43a3
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class MapSinkEncoder implements SinkEncoder<Map<String, Object>> {
+
+ private final MapSinkInfo sinkInfo;
+ private final Map<String, TypeConverter> converters;
+
+ public MapSinkEncoder(MapSinkInfo sinkInfo) {
+ this.sinkInfo = sinkInfo;
+ this.converters = sinkInfo.getFields()
+ .stream()
+ .collect(Collectors.toMap(FieldInfo::getName,
+ info -> info.getConverter() == null ?
TypeConverter.DefaultTypeConverter()
+ : info.getConverter()));
+ }
+
+ @Override
+ public Map<String, Object> encode(SinkData sinkData, Context context) {
+ Map<String, Object> esMap = new HashMap<>();
+ for (FieldInfo fieldInfo : sinkInfo.getFields()) {
+ String fieldName = fieldInfo.getName();
+ String strValue = sinkData.getField(fieldName);
+ TypeConverter converter = converters.get(fieldName);
+ if (converter == null) {
+ esMap.put(fieldName, strValue);
+ continue;
+ }
+
+ try {
+ esMap.put(fieldName, converter.convert(strValue));
+ } catch (Throwable t) {
+ log.warn("failed to serialize field ={}, value={}", fieldName,
strValue, t);
+ esMap.put(fieldName, null);
+ }
+ }
+
+ return esMap;
+ }
+
+ @Override
+ public List<FieldInfo> getFields() {
+ return sinkInfo.getFields();
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index f95d19bfca..30619078ac 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
public class SinkEncoderFactory {
@@ -29,4 +30,8 @@ public class SinkEncoderFactory {
public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
return new KvSinkEncoder(kvSinkInfo);
}
+
+ public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
+ return new MapSinkEncoder(mapSinkInfo);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
index 29c80c3f90..063552184b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
@@ -40,7 +40,7 @@ public class CsvSinkInfo extends SinkInfo {
@JsonProperty("delimiter") Character delimiter,
@JsonProperty("escapeChar") Character escapeChar,
@JsonProperty("fields") List<FieldInfo> fields) {
- super(SourceInfo.CSV, charset);
+ super(SinkInfo.CSV, charset);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
if (fields != null) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index 46106e534f..1027dad944 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.pojo;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
import lombok.Data;
/**
@@ -26,4 +28,14 @@ import lombok.Data;
public class FieldInfo {
private String name;
+ private TypeConverter converter;
+
+ public FieldInfo() {
+
+ }
+
+ public FieldInfo(String name, TypeConverter converter) {
+ this.name = name;
+ this.converter = converter;
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
index 49ff98e599..02111ab852 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
@@ -42,7 +42,7 @@ public class KvSinkInfo extends SinkInfo {
public KvSinkInfo(
@JsonProperty("charset") String charset,
@JsonProperty("fields") List<FieldInfo> fields) {
- super(SourceInfo.KV, charset);
+ super(SinkInfo.KV, charset);
if (fields != null) {
this.fields = fields;
} else {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java
similarity index 64%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java
index 49ff98e599..d85347f6ea 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java
@@ -17,53 +17,28 @@
package org.apache.inlong.sdk.transform.pojo;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.experimental.SuperBuilder;
+import org.apache.commons.collections.CollectionUtils;
-import java.util.ArrayList;
import java.util.List;
-/**
- * KvSinkInfo
- */
@JsonIgnoreProperties(ignoreUnknown = true)
-@Data
@SuperBuilder
-public class KvSinkInfo extends SinkInfo {
+@Data
+public class MapSinkInfo extends SinkInfo {
- private Character kvDelimiter;
- private Character entryDelimiter;
private List<FieldInfo> fields;
- @JsonCreator
- public KvSinkInfo(
+ public MapSinkInfo(
@JsonProperty("charset") String charset,
@JsonProperty("fields") List<FieldInfo> fields) {
- super(SourceInfo.KV, charset);
- if (fields != null) {
- this.fields = fields;
- } else {
- this.fields = new ArrayList<>();
+ super(SinkInfo.ES_MAP, charset);
+ if (CollectionUtils.isEmpty(fields)) {
+ throw new IllegalArgumentException("failed to init map sink info,
fieldInfos is empty");
}
- }
-
- /**
- * get fields
- * @return the fields
- */
- @JsonProperty("fields")
- public List<FieldInfo> getFields() {
- return fields;
- }
-
- /**
- * set fields
- * @param fields the fields to set
- */
- public void setFields(List<FieldInfo> fields) {
this.fields = fields;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index d7d029ab50..9c61c6b46c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import lombok.Data;
import lombok.experimental.SuperBuilder;
import java.util.Optional;
@@ -33,12 +34,17 @@ import static
com.google.common.base.Preconditions.checkNotNull;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @Type(value = CsvSinkInfo.class, name = SourceInfo.CSV),
- @Type(value = KvSinkInfo.class, name = SourceInfo.KV),
+ @Type(value = CsvSinkInfo.class, name = SinkInfo.CSV),
+ @Type(value = KvSinkInfo.class, name = SinkInfo.KV),
})
@SuperBuilder
+@Data
public abstract class SinkInfo {
+ public static final String CSV = "csv";
+ public static final String KV = "kv";
+ public static final String ES_MAP = "es_map";
+
@JsonIgnore
private String type;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
index b73f303233..2ce813ec03 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.pojo;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
@@ -42,7 +43,7 @@ public class TransformConfig {
@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("configuration") Map<String, Object> configuration) {
- this.transformSql = transformSql;
+ this.transformSql = Preconditions.checkNotNull(transformSql,
"transform sql should not be null");
this.configuration = configuration;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java
similarity index 65%
copy from
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java
index 03677557f6..455156a5cc 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.sink.elasticsearch;
-
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+package org.apache.inlong.sdk.transform.process.converter;
/**
- *
- * IEvent2IndexRequestHandler
+ * Converter to convert the transform intermediate string value to the given
data type
*/
-public interface IEvent2IndexRequestHandler {
+public interface TypeConverter {
/**
- * parse
- *
- * @param context
- * @param event
- * @return
+ *
+ * @param value String source value
+ * @return Converted type value
+ * @throws Exception Convert exception
*/
- EsIndexRequest parse(EsSinkContext context, ProfileEvent event);
+ Object convert(String value) throws Exception;
+
+ static TypeConverter DefaultTypeConverter() {
+ return value -> value;
+ }
+
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index af2892abc4..9b5e4bc4dd 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -25,6 +25,8 @@ import
org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
@@ -32,7 +34,7 @@ import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
-import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -42,13 +44,14 @@ import
org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.slf4j.Logger;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
@@ -69,7 +72,6 @@ public class SinkContext {
protected final String sinkName;
protected final Context sinkContext;
protected TaskConfig taskConfig;
- protected Map<String, TransformProcessor<String, String>> transformMap;
@Deprecated
protected SortTaskConfig sortTaskConfig;
protected final Channel channel;
@@ -91,7 +93,6 @@ public class SinkContext {
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
this.metricItemSet = new SortMetricItemSet(sinkName);
this.unifiedConfiguration =
CommonPropertiesHolder.useUnifiedConfiguration();
- this.transformMap = Maps.newConcurrentMap();
MetricRegister.register(this.metricItemSet);
}
@@ -197,7 +198,14 @@ public class SinkContext {
}
public TransformConfig createTransformConfig(DataFlowConfig
dataFlowConfig) {
- return new TransformConfig(dataFlowConfig.getTransformSql());
+ return new TransformConfig(dataFlowConfig.getTransformSql(),
globalConfiguration());
+ }
+
+ public Map<String, Object> globalConfiguration() {
+ Map<String, Object> globalConfiguration = new HashMap<>();
+ globalConfiguration.putAll(CommonPropertiesHolder.get());
+ globalConfiguration.putAll(sinkContext.getParameters());
+ return ImmutableMap.copyOf(globalConfiguration);
}
public SourceDecoder<String> createSourceDecoder(SourceConfig
sourceConfig) {
@@ -234,8 +242,14 @@ public class SinkContext {
}
public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
- FieldInfo fieldInfo = new FieldInfo();
- fieldInfo.setName(config.getName());
- return fieldInfo;
+ return new FieldInfo(config.getName(),
deriveTypeConverter(config.getFormatInfo()));
+ }
+
+ public TypeConverter deriveTypeConverter(FormatInfo formatInfo) {
+
+ if (formatInfo instanceof BasicFormatInfo) {
+ return value -> ((BasicFormatInfo<?>)
formatInfo).deserialize(value);
+ }
+ return value -> value;
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
index a22ddc7990..d67b60beca 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
@@ -18,9 +18,12 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
+import lombok.extern.slf4j.Slf4j;
+
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -28,11 +31,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
*
* DefaultEvent2IndexRequestHandler
*/
+@Slf4j
public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHandler {
public static final String KEY_EXTINFO = "extinfo";
@@ -105,6 +110,32 @@ public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHand
return indexRequest;
}
+ @Override
+ public List<EsIndexRequest> parse(
+ EsSinkContext context,
+ ProfileEvent event,
+ TransformProcessor<String, Map<String, Object>> processor) {
+ if (processor == null) {
+ log.error("find no any transform processor for es sink");
+ return null;
+ }
+
+ String uid = event.getUid();
+ EsIdConfig idConfig = context.getIdConfig(uid);
+ String indexName = idConfig.parseIndexName(event.getRawLogTime());
+ byte[] bodyBytes = event.getBody();
+ String strContext = new String(bodyBytes, idConfig.getCharset());
+ // build
+ List<Map<String, Object>> esData = processor.transform(strContext);
+ return esData.stream()
+ .map(data -> {
+ EsIndexRequest indexRequest = new
EsIndexRequest(indexName, event);
+ indexRequest.source(data);
+ return indexRequest;
+ })
+ .collect(Collectors.toList());
+ }
+
/**
* getExtInfo
*
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
index c67edd0d44..49908b2326 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
@@ -18,7 +18,9 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
@@ -26,6 +28,8 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* EsChannelWorker
*/
@@ -90,16 +94,27 @@ public class EsChannelWorker extends Thread {
}
// to profileEvent
ProfileEvent profileEvent = (ProfileEvent) event;
- EsIndexRequest indexRequest = handler.parse(context, profileEvent);
- // offer queue
- if (indexRequest != null) {
- context.offerDispatchQueue(indexRequest);
+ if (!CommonPropertiesHolder.useUnifiedConfiguration()) {
+ EsIndexRequest indexRequest = handler.parse(context,
profileEvent);
+ // offer queue
+ if (indexRequest != null) {
+ context.offerDispatchQueue(indexRequest);
+ } else {
+ context.addSendFailMetric();
+ profileEvent.ack();
+ }
+ tx.commit();
} else {
- context.addSendFailMetric();
- profileEvent.ack();
+ List<EsIndexRequest> indexRequestList = handler.parse(
+ context, profileEvent,
context.getTransformProcessor(profileEvent.getUid()));
+ if (CollectionUtils.isNotEmpty(indexRequestList)) {
+ indexRequestList.forEach(context::offerDispatchQueue);
+ } else {
+ context.addSendFailMetric();
+ profileEvent.ack();
+ }
}
- tx.commit();
- return;
+
} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);
try {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index cc98ab57d6..3136a49276 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -27,7 +27,9 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
+import lombok.extern.slf4j.Slf4j;
+import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
@@ -38,6 +40,7 @@ import java.util.stream.Collectors;
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
+@Slf4j
public class EsIdConfig extends IdConfig {
public static final String PATTERN_DAY = "{yyyyMMdd}";
@@ -71,6 +74,7 @@ public class EsIdConfig extends IdConfig {
private int fieldOffset = 2; // for ftime,extinfo
private int contentOffset = 0;// except for boss + tab(1)
private List<String> fieldList;
+ private Charset charset;
public static EsIdConfig create(DataFlowConfig dataFlowConfig) {
EsSinkConfig sinkConfig = (EsSinkConfig)
dataFlowConfig.getSinkConfig();
@@ -78,6 +82,15 @@ public class EsIdConfig extends IdConfig {
.stream()
.map(FieldConfig::getName)
.collect(Collectors.toList());
+ Charset charset;
+ try {
+ charset = Charset.forName(sinkConfig.getEncodingType());
+ } catch (Throwable t) {
+ log.warn("do not support encoding type={}, dataflow id={}",
+ sinkConfig.getEncodingType(),
dataFlowConfig.getDataflowId());
+ charset = Charset.defaultCharset();
+ }
+
return EsIdConfig.builder()
.inlongGroupId(dataFlowConfig.getInlongGroupId())
.inlongStreamId(dataFlowConfig.getInlongStreamId())
@@ -86,6 +99,7 @@ public class EsIdConfig extends IdConfig {
.separator(sinkConfig.getSeparator())
.indexNamePattern(sinkConfig.getIndexNamePattern())
.fieldList(fields)
+ .charset(charset)
.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index f1f0cccf88..9c9664d775 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.MapSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -37,6 +45,8 @@ import
org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import lombok.Getter;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@@ -119,6 +129,9 @@ public class EsSinkContext extends SinkContext {
private String strHttpHosts;
private HttpHost[] httpHosts;
+ @Getter
+ protected Map<String, TransformProcessor<String, Map<String, Object>>>
transformMap;
+
public EsSinkContext(String sinkName, Context context, Channel channel,
BufferQueue<EsIndexRequest> dispatchQueue) {
super(sinkName, context, channel);
@@ -155,9 +168,12 @@ public class EsSinkContext extends SinkContext {
// change current config
Map<String, EsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig);
+ Map<String, TransformProcessor<String, Map<String, Object>>>
transformProcessor =
+ reloadTransform(taskConfig);
Map<String, EsIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
if (unifiedConfiguration) {
idConfigMap = fromTaskConfig;
+ transformMap = transformProcessor;
reloadClientsFromNodeConfig(esNodeConfig);
} else {
idConfigMap = fromSortTaskConfig;
@@ -207,6 +223,54 @@ public class EsSinkContext extends SinkContext {
return newIdConfigMap;
}
+ private Map<String, TransformProcessor<String, Map<String, Object>>>
reloadTransform(TaskConfig taskConfig) {
+ ImmutableMap.Builder<String, TransformProcessor<String, Map<String,
Object>>> builder =
+ new ImmutableMap.Builder<>();
+
+ taskConfig.getClusterTagConfigs()
+ .stream()
+ .map(ClusterTagConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .forEach(flow -> {
+ TransformProcessor<String, Map<String, Object>>
transformProcessor =
+ createTransform(flow);
+ if (transformProcessor == null) {
+ return;
+ }
+ builder.put(InlongId.generateUid(flow.getInlongGroupId(),
flow.getInlongStreamId()),
+ transformProcessor);
+ });
+
+ return builder.build();
+ }
+
+ private TransformProcessor<String, Map<String, Object>>
createTransform(DataFlowConfig dataFlowConfig) {
+ try {
+ return TransformProcessor.create(
+ createTransformConfig(dataFlowConfig),
+ createSourceDecoder(dataFlowConfig.getSourceConfig()),
+ createEsSinkEncoder(dataFlowConfig.getSinkConfig()));
+ } catch (Exception e) {
+ LOG.error("failed to reload transform of dataflow={}, ex={}",
dataFlowConfig.getDataflowId(),
+ e.getMessage());
+ return null;
+ }
+ }
+
+ private MapSinkEncoder createEsSinkEncoder(SinkConfig sinkConfig) {
+ if (!(sinkConfig instanceof EsSinkConfig)) {
+ throw new IllegalArgumentException("sinkInfo must be an instance
of EsMapSinkInfo");
+ }
+ EsSinkConfig esSinkConfig = (EsSinkConfig) sinkConfig;
+ List<FieldInfo> fieldInfos = esSinkConfig.getFieldConfigs()
+ .stream()
+ .map(config -> new FieldInfo(config.getName(),
deriveTypeConverter(config.getFormatInfo())))
+ .collect(Collectors.toList());
+
+ MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(),
fieldInfos);
+ return SinkEncoderFactory.createMapEncoder(sinkInfo);
+ }
+
private void reloadClientsFromNodeConfig(EsNodeConfig esNodeConfig) {
Map<String, String> properties = esNodeConfig.getProperties();
this.sinkContext = new Context(properties != null ? properties : new
HashMap<>());
@@ -365,6 +429,10 @@ public class EsSinkContext extends SinkContext {
return this.idConfigMap.get(uid);
}
+ public TransformProcessor<String, Map<String, Object>>
getTransformProcessor(String uid) {
+ return this.transformMap.get(uid);
+ }
+
/**
* get nodeId
*
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
index 03677557f6..db2e4bb8ee 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
@@ -17,8 +17,12 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import java.util.List;
+import java.util.Map;
+
/**
*
* IEvent2IndexRequestHandler
@@ -33,4 +37,9 @@ public interface IEvent2IndexRequestHandler {
* @return
*/
EsIndexRequest parse(EsSinkContext context, ProfileEvent event);
+
+ List<EsIndexRequest> parse(
+ EsSinkContext context,
+ ProfileEvent event,
+ TransformProcessor<String, Map<String, Object>>
transformProcessor);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 4e00d3e690..83d91f92a4 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.Constants;
@@ -107,6 +108,7 @@ public class TestEsSinkContext {
*/
@Test
public void test() throws Exception {
+ SortConfigMetricReporter.init(CommonPropertiesHolder.get());
BufferQueue<EsIndexRequest> dispatchQueue =
SinkContext.createBufferQueue();
EsSinkContext context = mock(dispatchQueue);
assertEquals(10, context.getBulkSizeMb());