This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e68c69f986 [INLONG-10208][Sort] ClsSink support unified configuration
(#10220)
e68c69f986 is described below
commit e68c69f986df57f04d0ef0ba067db07ae2912f4e
Author: vernedeng <[email protected]>
AuthorDate: Wed May 15 15:54:21 2024 +0800
[INLONG-10208][Sort] ClsSink support unified configuration (#10220)
* [INLONG-10208][Sort] ClsSink support unified configuration
---
.../inlong/common/pojo/sort/SortClusterConfig.java | 4 +
.../inlong/common/pojo/sort/SortTaskConfig.java | 4 +
.../common/pojo/sort/dataflow/DataFlowConfig.java | 6 +
.../resource/sort/DefaultSortConfigOperator.java | 2 +
.../inlong/sort/standalone/sink/SinkContext.java | 1 +
.../sort/standalone/sink/cls/ClsIdConfig.java | 49 +++--
.../inlong/sort/standalone/sink/cls/ClsSink.java | 26 +--
.../sort/standalone/sink/cls/ClsSinkContext.java | 167 +++--------------
.../sort/standalone/sink/{ => v2}/SinkContext.java | 108 +----------
.../source/sortsdk/v2/SortSdkSource.java | 205 +++++++++++++++++++++
.../sort/standalone/sink/cls/TestClsIdConfig.java | 4 +-
.../sink/cls/TestDefaultEvent2LogItemHandler.java | 3 +-
12 files changed, 296 insertions(+), 283 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
index 13109d1871..ff3d337cb1 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
@@ -20,14 +20,18 @@ package org.apache.inlong.common.pojo.sort;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
@Data
@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class SortClusterConfig implements Serializable {
private String clusterTag;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
index e8795fb2be..b8f80ef26f 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
@@ -19,14 +19,18 @@ package org.apache.inlong.common.pojo.sort;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
@Data
@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class SortTaskConfig implements Serializable {
private String sortTaskName;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
index 7429a78be3..a7bb1c36a3 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
@@ -19,18 +19,24 @@ package org.apache.inlong.common.pojo.sort.dataflow;
import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Map;
@Data
@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class DataFlowConfig implements Serializable {
private String dataflowId;
private Integer version;
+ private String inlongGroupId;
+ private String inlongStreamId;
private SourceConfig sourceConfig;
private SinkConfig sinkConfig;
private Map<String, String> properties;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index b712ff11ee..5c51c1abcf 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -161,6 +161,8 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
.dataflowId(String.valueOf(sink.getId()))
.sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
.sinkConfig(getSinkConfig(sink))
+ .inlongGroupId(groupInfo.getInlongGroupId())
+ .inlongStreamId(streamInfo.getInlongStreamId())
.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 7e8b26e3ca..16b15ab8b9 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -41,6 +41,7 @@ import java.util.TimerTask;
*
* SinkContext
*/
+@Deprecated
public class SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(SinkContext.class);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index faefaa6aa1..f0fd784acb 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -17,16 +17,26 @@
package org.apache.inlong.sort.standalone.sink.cls;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
+import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Cls config of each uid.
*/
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class ClsIdConfig {
private String inlongGroupId;
@@ -36,23 +46,28 @@ public class ClsIdConfig {
private String secretId;
private String secretKey;
private String topicId;
- private String fieldNames;
+ private List<String> fieldList;
private int fieldOffset = 2;
private int contentOffset = 0;
- private List<String> fieldList;
- /**
- * Parse fieldNames to list of fields.
- * @return List of fields.
- */
- public List<String> getFieldList() {
- if (fieldList == null) {
- this.fieldList = new ArrayList<>();
- if (fieldNames != null) {
- String[] fieldNameArray = fieldNames.split("\\s+");
- this.fieldList.addAll(Arrays.asList(fieldNameArray));
- }
- }
- return fieldList;
+ public static ClsIdConfig create(DataFlowConfig dataFlowConfig,
ClsNodeConfig nodeConfig) {
+ ClsSinkConfig sinkConfig = (ClsSinkConfig)
dataFlowConfig.getSinkConfig();
+ List<String> fields = sinkConfig.getFieldConfigs()
+ .stream()
+ .map(FieldConfig::getName)
+ .collect(Collectors.toList());
+ return ClsIdConfig.builder()
+ .inlongGroupId(dataFlowConfig.getInlongGroupId())
+ .inlongStreamId(dataFlowConfig.getInlongStreamId())
+ .contentOffset(sinkConfig.getContentOffset())
+ .fieldOffset(sinkConfig.getFieldOffset())
+ .separator(sinkConfig.getSeparator())
+ .fieldList(fields)
+ .topicId(sinkConfig.getTopicId())
+ .endpoint(nodeConfig.getEndpoint())
+ .secretId(nodeConfig.getSendSecretId())
+ .secretKey(nodeConfig.getSendSecretKey())
+ .build();
}
+
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
index 30289489e4..e959b249fe 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
@@ -27,13 +27,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-/**
- * Cls Sink implementation.
- *
- * <p>
- * Response for initialization of {@link ClsChannelWorker}.
- * </p>
- */
public class ClsSink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class);
@@ -42,9 +35,6 @@ public class ClsSink extends AbstractSink implements
Configurable {
private ClsSinkContext context;
private List<ClsChannelWorker> workers = new ArrayList<>();
- /**
- * Start {@link ClsChannelWorker}.
- */
@Override
public void start() {
super.start();
@@ -57,13 +47,10 @@ public class ClsSink extends AbstractSink implements
Configurable {
worker.start();
}
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to start cls sink, ex={}", e.getMessage(), e);
}
}
- /**
- * Stop {@link ClsChannelWorker}.
- */
@Override
public void stop() {
super.stop();
@@ -74,24 +61,15 @@ public class ClsSink extends AbstractSink implements
Configurable {
}
this.workers.clear();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to stop cls sink, ex={}", e.getMessage(), e);
}
}
- /**
- * Process.
- * @return Status
- * @throws EventDeliveryException
- */
@Override
public Status process() throws EventDeliveryException {
return Status.BACKOFF;
}
- /**
- * Config parent context.
- * @param context Parent context.
- */
@Override
public void configure(Context context) {
LOG.info("start to configure:{}, context:{}.", this.getName(),
context.toString());
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 6d05717243..fef43cadfc 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -17,18 +17,18 @@
package org.apache.inlong.sort.standalone.sink.cls;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
-import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tencentcloudapi.cls.producer.AsyncProducerClient;
@@ -41,16 +41,13 @@ import org.apache.flume.Context;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-/**
- * Cls sink context.
- */
public class ClsSinkContext extends SinkContext {
private static final Logger LOG =
InlongLoggerFactory.getLogger(ClsSinkContext.class);
@@ -74,20 +71,16 @@ public class ClsSinkContext extends SinkContext {
private final Map<String, AsyncProducerClient> clientMap;
private List<AsyncProducerClient> deletingClients = new ArrayList<>();
- private Context sinkContext;
private Map<String, ClsIdConfig> idConfigMap = new ConcurrentHashMap<>();
private IEvent2LogItemHandler event2LogItemHandler;
+ private ClsNodeConfig clsNodeConfig;
+ private ObjectMapper objectMapper;
- /**
- * Constructor
- *
- * @param sinkName Name of sink.
- * @param context Basic context.
- * @param channel Channel which worker acquire profile event from.
- */
public ClsSinkContext(String sinkName, Context context, Channel channel) {
super(sinkName, context, channel);
this.clientMap = new ConcurrentHashMap<>();
+ this.objectMapper = new ObjectMapper();
+
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Override
@@ -104,26 +97,25 @@ public class ClsSinkContext extends SinkContext {
}
});
- SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
if (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig)) {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
- new ObjectMapper().writeValueAsString(newSortTaskConfig));
+ objectMapper.writeValueAsString(newSortTaskConfig));
this.sortTaskConfig = newSortTaskConfig;
- this.sinkContext = new
Context(this.sortTaskConfig.getSinkParams());
+ ClsNodeConfig requestNodeConfig = (ClsNodeConfig)
sortTaskConfig.getNodeConfig();
+ this.clsNodeConfig =
+ requestNodeConfig.getVersion() >=
clsNodeConfig.getVersion() ? requestNodeConfig : clsNodeConfig;
+ this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
this.reloadIdParams();
this.reloadClients();
this.reloadHandler();
- this.keywordMaxLength =
sinkContext.getInteger(KEY_MAX_KEYWORD_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
- /**
- * Reload LogItemHandler.
- */
private void reloadHandler() {
String logItemHandlerClass =
CommonPropertiesHolder.getString(KEY_EVENT_LOG_ITEM_HANDLER,
DefaultEvent2LogItemHandler.class.getName());
@@ -136,47 +128,22 @@ public class ClsSinkContext extends SinkContext {
LOG.error("{} is not the instance of IEvent2LogItemHandler",
logItemHandlerClass);
}
} catch (Throwable t) {
- LOG.error("Fail to init IEvent2LogItemHandler, handlerClass:{},
error:{}",
+ LOG.error("fail to init IEvent2LogItemHandler, handlerClass:{},
error:{}",
logItemHandlerClass, t.getMessage());
}
}
- /**
- * Reload id params.
- *
- * @throws JsonProcessingException
- */
- private void reloadIdParams() throws JsonProcessingException {
- List<Map<String, String>> idList = this.sortTaskConfig.getIdParams();
- Map<String, ClsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
- ObjectMapper objectMapper = new ObjectMapper();
-
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- for (Map<String, String> idParam : idList) {
- String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
- String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
- String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
- String jsonIdConfig = objectMapper.writeValueAsString(idParam);
- ClsIdConfig idConfig = objectMapper.readValue(jsonIdConfig,
ClsIdConfig.class);
- idConfig.getFieldList();
- newIdConfigMap.put(uid, idConfig);
- }
- this.idConfigMap = newIdConfigMap;
+ private void reloadIdParams() {
+ this.idConfigMap = this.sortTaskConfig.getClusters()
+ .stream()
+ .map(SortClusterConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .map(dataFlowConfig -> ClsIdConfig.create(dataFlowConfig,
clsNodeConfig))
+ .collect(Collectors.toMap(
+ config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+ v -> v));
}
- /**
- * Close expire clients and start new clients.
- *
- * <p>
- * Each client response for data of one secretId.
- * </p>
- * <p>
- * First, find all secretId that are in the active clientMap but not in
the updated id config (or to say EXPIRE
- * secretId), and put those clients into deletingClientsMap. The real
close process will be done at the beginning of
- * next period of reloading. Second, find all secretIds that in the
updated id config but not in the active
- * clientMap(or to say NEW secretId), and start new clients for these
secretId and put them into the active
- * clientMap.
- * </p>
- */
private void reloadClients() {
// get update secretIds
Map<String, ClsIdConfig> updateConfigMap = idConfigMap.values()
@@ -196,76 +163,25 @@ public class ClsSinkContext extends SinkContext {
.forEach(this::startNewClient);
}
- /**
- * Start new cls client and put it to the active clientMap.
- *
- * @param idConfig idConfig of new client.
- */
private void startNewClient(ClsIdConfig idConfig) {
AsyncProducerConfig producerConfig = new AsyncProducerConfig(
idConfig.getEndpoint(),
idConfig.getSecretId(),
idConfig.getSecretKey(),
NetworkUtils.getLocalMachineIP());
- this.setCommonClientConfig(producerConfig);
AsyncProducerClient client = new AsyncProducerClient(producerConfig);
clientMap.put(idConfig.getSecretId(), client);
}
- /**
- * Get common client config from context and set them.
- *
- * @param config Config to be set.
- */
- private void setCommonClientConfig(AsyncProducerConfig config) {
- Optional.ofNullable(sinkContext.getInteger(KEY_TOTAL_SIZE_IN_BYTES))
- .ifPresent(config::setTotalSizeInBytes);
- Optional.ofNullable(sinkContext.getInteger(KEY_MAX_SEND_THREAD_COUNT))
- .ifPresent(config::setSendThreadCount);
- Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BLOCK_SEC))
- .ifPresent(config::setMaxBlockMs);
- Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_SIZE))
- .ifPresent(config::setBatchSizeThresholdInBytes);
- Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_COUNT))
- .ifPresent(config::setBatchCountThreshold);
- Optional.ofNullable(sinkContext.getInteger(KEY_LINGER_MS))
- .ifPresent(config::setLingerMs);
- Optional.ofNullable(sinkContext.getInteger(KEY_RETRIES))
- .ifPresent(config::setRetries);
- Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RESERVED_ATTEMPTS))
- .ifPresent(config::setMaxReservedAttempts);
- Optional.ofNullable(sinkContext.getInteger(KEY_BASE_RETRY_BACKOFF_MS))
- .ifPresent(config::setBaseRetryBackoffMs);
- Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RETRY_BACKOFF_MS))
- .ifPresent(config::setMaxRetryBackoffMs);
- }
-
- /**
- * Remove expire client from active clientMap and into the deleting client
list.
- * <P>
- * The reason why not close client when it remove from clientMap is to
avoid <b>Race Condition</b>. Which will
- * happen when worker thread get the client and ready to send msg, while
the reload thread try to close it.
- * </P>
- *
- * @param secretId SecretId of expire client.
- */
private void removeExpireClient(String secretId) {
AsyncProducerClient client = clientMap.get(secretId);
if (client == null) {
- LOG.error("Remove client failed, there is not client of {}",
secretId);
+ LOG.error("remove client failed, there is not client of {}",
secretId);
return;
}
deletingClients.add(clientMap.remove(secretId));
}
- /**
- * Add send result.
- *
- * @param currentRecord Event to be sent.
- * @param bid Topic or dest ip of event.
- * @param result Result of send.
- * @param sendTime Time of sending.
- */
public void addSendResultMetric(ProfileEvent currentRecord, String bid,
boolean result, long sendTime) {
Map<String, String> dimensions = this.getDimensions(currentRecord,
bid);
SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
@@ -288,13 +204,6 @@ public class ClsSinkContext extends SinkContext {
}
}
- /**
- * Get report dimensions.
- *
- * @param currentRecord Event.
- * @param bid Topic or dest ip.
- * @return Prepared dimensions map.
- */
private Map<String, String> getDimensions(ProfileEvent currentRecord,
String bid) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
@@ -309,40 +218,18 @@ public class ClsSinkContext extends SinkContext {
return dimensions;
}
- /**
- * Get {@link ClsIdConfig} by uid.
- *
- * @param uid Uid of event.
- * @return Corresponding cls id config.
- */
public ClsIdConfig getIdConfig(String uid) {
return idConfigMap.get(uid);
}
- /**
- * Get max length of single value.
- *
- * @return Max length of single value.
- */
public int getKeywordMaxLength() {
return keywordMaxLength;
}
- /**
- * Get LogItem handler.
- *
- * @return Handler.
- */
public IEvent2LogItemHandler getLogItemHandler() {
return event2LogItemHandler;
}
- /**
- * Get cls client.
- *
- * @param secretId ID of client.
- * @return Client instance.
- */
public AsyncProducerClient getClient(String secretId) {
return clientMap.get(secretId);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
similarity index 75%
copy from
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
copy to
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
index 7e8b26e3ca..251a6d56af 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.sink;
+package org.apache.inlong.sort.standalone.sink.v2;
import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
@@ -37,84 +37,57 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-/**
- *
- * SinkContext
- */
public class SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(SinkContext.class);
-
public static final String KEY_MAX_THREADS = "maxThreads";
public static final String KEY_PROCESSINTERVAL = "processInterval";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
public static final String KEY_TASK_NAME = "taskName";
public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB =
"maxBufferQueueSizeKb";
public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
-
protected final String clusterId;
protected final String taskName;
protected final String sinkName;
protected final Context sinkContext;
-
protected SortTaskConfig sortTaskConfig;
-
protected final Channel channel;
- //
protected final int maxThreads;
protected final long processInterval;
protected final long reloadInterval;
- //
protected final SortMetricItemSet metricItemSet;
protected Timer reloadTimer;
- /**
- * Constructor
- *
- * @param sinkName
- * @param context
- * @param channel
- */
public SinkContext(String sinkName, Context context, Channel channel) {
this.sinkName = sinkName;
this.sinkContext = context;
this.channel = channel;
- this.clusterId =
context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- this.taskName = context.getString(KEY_TASK_NAME);
+ this.clusterId =
sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+ this.taskName = sinkContext.getString(KEY_TASK_NAME);
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL,
100);
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
- //
this.metricItemSet = new SortMetricItemSet(sinkName);
MetricRegister.register(this.metricItemSet);
}
- /**
- * start
- */
public void start() {
try {
this.reload();
this.setReloadTimer();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to start sink context", e);
}
}
- /**
- * close
- */
public void close() {
try {
this.reloadTimer.cancel();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to close sink context", e);
}
}
- /**
- * setReloadTimer
- */
protected void setReloadTimer() {
reloadTimer = new Timer(true);
TimerTask task = new TimerTask() {
@@ -126,113 +99,54 @@ public class SinkContext {
reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
}
- /**
- * reload
- */
public void reload() {
try {
- this.sortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ this.sortTaskConfig = SortConfigHolder.getTaskConfig(taskName);
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("failed to stop sink context", e);
}
}
- /**
- * get clusterId
- *
- * @return the clusterId
- */
public String getClusterId() {
return clusterId;
}
- /**
- * get taskName
- *
- * @return the taskName
- */
public String getTaskName() {
return taskName;
}
- /**
- * get sinkName
- *
- * @return the sinkName
- */
public String getSinkName() {
return sinkName;
}
- /**
- * get sinkContext
- *
- * @return the sinkContext
- */
public Context getSinkContext() {
return sinkContext;
}
- /**
- * get sortTaskConfig
- *
- * @return the sortTaskConfig
- */
public SortTaskConfig getSortTaskConfig() {
return sortTaskConfig;
}
- /**
- * get channel
- *
- * @return the channel
- */
public Channel getChannel() {
return channel;
}
- /**
- * get maxThreads
- *
- * @return the maxThreads
- */
public int getMaxThreads() {
return maxThreads;
}
- /**
- * get processInterval
- *
- * @return the processInterval
- */
public long getProcessInterval() {
return processInterval;
}
- /**
- * get reloadInterval
- *
- * @return the reloadInterval
- */
public long getReloadInterval() {
return reloadInterval;
}
- /**
- * get metricItemSet
- *
- * @return the metricItemSet
- */
public SortMetricItemSet getMetricItemSet() {
return metricItemSet;
}
- /**
- * fillInlongId
- *
- * @param currentRecord
- * @param dimensions
- */
public static void fillInlongId(ProfileEvent currentRecord, Map<String,
String> dimensions) {
String inlongGroupId = currentRecord.getInlongGroupId();
inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" :
inlongGroupId;
@@ -242,10 +156,6 @@ public class SinkContext {
dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
}
- /**
- * createBufferQueue
- * @return
- */
public static <U> BufferQueue<U> createBufferQueue() {
int maxBufferQueueSizeKb =
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
new file mode 100644
index 0000000000..43245da9d0
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk.v2;
+
+import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
+import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import
org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
+import
org.apache.inlong.sort.standalone.source.sortsdk.DefaultTopicChangeListener;
+import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback;
+import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext;
+import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p>
+ * SortSdkSource acquired msg from different upstream data store by register
{@link SortClient} for each sort task. The
+ * only things SortSdkSource should do is to get one client by the sort task
id, or remove one client when the task is
+ * finished or schedule to other source instance.
+ * </p>
+ *
+ * <p>
+ * The Default Manager of InLong will schedule the partition and topic
automatically.
+ * </p>
+ *
+ * <p>
+ * Because all sources should implement {@link Configurable}, the
SortSdkSource should have default constructor
+ * <b>WITHOUT</b> any arguments, and parameters will be configured by {@link
Configurable#configure(Context)}.
+ * </p>
+ */
+public final class SortSdkSource extends AbstractSource
+ implements
+ Configurable,
+ Runnable,
+ EventDrivenSource,
+ ConsumerServiceMBean {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SortSdkSource.class);
+ public static final String SORT_SDK_PREFIX = "sortsdk.";
+ private static final int CORE_POOL_SIZE = 1;
+ private static final SortClientConfig.ConsumeStrategy defaultStrategy =
SortClientConfig.ConsumeStrategy.lastest;
+ private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
+ private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
+ private String taskName;
+ private SortSdkSourceContext context;
+ private String sortClusterName;
+ private long reloadInterval;
+ private ScheduledExecutorService pool;
+
+ private List<SortClient> sortClients = new ArrayList<>();
+
+ @Override
+ public synchronized void start() {
+ int sortSdkClientNum =
CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM,
DEFAULT_SORT_SDK_CLIENT_NUM);
+ LOG.info("start SortSdkSource:{}, client num is {}", taskName,
sortSdkClientNum);
+ for (int i = 0; i < sortSdkClientNum; i++) {
+ SortClient client = this.newClient(taskName);
+ if (client != null) {
+ this.sortClients.add(client);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ pool.shutdownNow();
+ LOG.info("close sort client {}.", taskName);
+ for (SortClient sortClient : sortClients) {
+ sortClient.getConfig().setStopConsume(true);
+ sortClient.close();
+ }
+ }
+
+ @Override
+ public void run() {
+ LOG.info("start to reload SortSdkSource:{}", taskName);
+ for (SortClient sortClient : sortClients) {
+
sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
+ }
+ }
+
+ @Override
+ public void configure(Context context) {
+ this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME);
+ this.context = new SortSdkSourceContext(getName(), context);
+ this.sortClusterName =
SortConfigHolder.getSortConfig().getSortClusterName();
+ this.reloadInterval = this.context.getReloadInterval();
+ this.initReloadExecutor();
+ // register
+ AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE,
taskName, this);
+ }
+
+ private void initReloadExecutor() {
+ this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+ pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval,
TimeUnit.SECONDS);
+ }
+
+ private SortClient newClient(final String sortTaskName) {
+ LOG.info("start a new sort client for task: {}", sortTaskName);
+ try {
+ final SortClientConfig clientConfig = new
SortClientConfig(sortTaskName, this.sortClusterName,
+ new DefaultTopicChangeListener(),
+ SortSdkSource.defaultStrategy,
InetAddress.getLocalHost().getHostAddress());
+ final FetchCallback callback =
FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
+ clientConfig.setCallback(callback);
+ Map<String, String> sortSdkParams =
this.getSortClientConfigParameters();
+ clientConfig.setParameters(sortSdkParams);
+
+ // create SortClient
+ String configType = CommonPropertiesHolder
+ .getString(SortSourceConfigType.KEY_TYPE,
SortSourceConfigType.MANAGER.name());
+ SortClient client = null;
+ if
(SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) {
+ LOG.info("create sort sdk client in file way:{}", configType);
+ ClassResourceQueryConsumeConfig queryConfig = new
ClassResourceQueryConsumeConfig();
+ client = SortClientFactory.createSortClient(clientConfig,
queryConfig);
+ } else if
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) {
+ LOG.info("create sort sdk client in manager way:{}",
configType);
+
clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
+ client = SortClientFactory.createSortClient(clientConfig);
+ } else {
+ LOG.info("create sort sdk client in custom way:{}",
configType);
+ Class<?> loaderClass = ClassUtils.getClass(configType);
+ Object loaderObject =
loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof Configurable) {
+ ((Configurable) loaderObject).configure(new
Context(CommonPropertiesHolder.get()));
+ }
+ if (!(loaderObject instanceof QueryConsumeConfig)) {
+ LOG.error("got exception when create QueryConsumeConfig
instance, config key:{},config class:{}",
+ SortSourceConfigType.KEY_TYPE, configType);
+ return null;
+ }
+ // if it specifies the type of QueryConsumeConfig.
+ client = SortClientFactory.createSortClient(clientConfig,
(QueryConsumeConfig) loaderObject);
+ }
+
+ client.init();
+ callback.setClient(client);
+ return client;
+ } catch (Throwable th) {
+ LOG.error("got one throwable when init client of id:{}",
sortTaskName, th);
+ }
+ return null;
+ }
+
+ private Map<String, String> getSortClientConfigParameters() {
+ Map<String, String> commonParams =
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
+ return new HashMap<>(commonParams);
+ }
+
+ @Override
+ public void stopConsumer() {
+ for (SortClient sortClient : sortClients) {
+ sortClient.getConfig().setStopConsume(true);
+ }
+ }
+
+ @Override
+ public void recoverConsumer() {
+ for (SortClient sortClient : sortClients) {
+ sortClient.getConfig().setStopConsume(false);
+ }
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
index 270760f317..bfeb0adde6 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
@@ -23,6 +23,7 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Arrays;
import java.util.List;
@RunWith(PowerMockRunner.class)
@@ -32,8 +33,7 @@ public class TestClsIdConfig {
@Test
public void testGetFieldList() {
ClsIdConfig idConfig = new ClsIdConfig();
- String testFieldName = "1 2 3 4 5 6 7";
- idConfig.setFieldNames(testFieldName);
+ idConfig.setFieldList(Arrays.asList("1", "2", "3", "4", "5", "6",
"7"));
List<String> fieldList = idConfig.getFieldList();
Assert.assertEquals(7, fieldList.size());
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
index 512a349190..efdf3cb52c 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
@@ -32,6 +32,7 @@ import
org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -69,7 +70,7 @@ public class TestDefaultEvent2LogItemHandler {
private ClsIdConfig prepareIdConfig() {
ClsIdConfig config = new ClsIdConfig();
- config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8");
+ config.setFieldList(Arrays.asList("f1", "f2", "f3", "f4", "f5", "f6",
"f7", "f8"));
config.setInlongGroupId("testGroup");
config.setInlongStreamId("testStream");
config.setSecretId("testSecretId");