This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ce3e996e16 [INLONG-11836][Sort] Provide SortStandalone flow control to
prevent single-task blocking from affecting the normal sorting of other tasks
(#11837)
ce3e996e16 is described below
commit ce3e996e1659b50d4fc99f5d0833f0021056d0b4
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Apr 22 12:50:09 2025 +0800
[INLONG-11836][Sort] Provide SortStandalone flow control to prevent
single-task blocking from affecting the normal sorting of other tasks (#11837)
---
.../standalone/metrics/status/SortTaskStatus.java | 72 ++++++++++
.../status/SortTaskStatusMetricListener.java | 47 +++++++
.../metrics/status/SortTaskStatusRepository.java | 151 +++++++++++++++++++++
.../sink/kafka/KafkaProducerCluster.java | 3 +-
.../standalone/source/sortsdk/FetchCallback.java | 2 +
.../inlong/sort/standalone/v2/SortCluster.java | 17 ++-
6 files changed, 288 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatus.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatus.java
new file mode 100644
index 0000000000..cd2aa48733
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.status;
+
+import lombok.Data;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * SortTaskStatus
+ */
+@Data
+public class SortTaskStatus {
+
+ private String taskName;
+ private boolean hasFirstSuccess = false;
+ private Semaphore firstSuccessPermit = new Semaphore(1);
+ private AtomicLong sendCount = new AtomicLong(0);
+ private AtomicLong sendFailCount = new AtomicLong(0);
+ private long previousPauseSortTaskTime = Long.MIN_VALUE;
+
+ public SortTaskStatus(String taskName) {
+ this.taskName = taskName;
+ }
+
+ public void tryFirstSend() throws InterruptedException {
+ firstSuccessPermit.acquire(1);
+ }
+
+ public void firstSuccess() {
+ hasFirstSuccess = true;
+ firstSuccessPermit.release(1);
+ }
+
+ public boolean needPauseSortTask(int failCountLimit, int
failCountPercentLimit) {
+ long sendFail = this.sendFailCount.getAndSet(0);
+ long send = this.sendCount.getAndSet(0);
+ if (sendFail < failCountLimit) {
+ return false;
+ }
+ if (send <= 0) {
+ return false;
+ }
+ long rate = sendFail * 100 / send;
+ if (rate < failCountPercentLimit) {
+ return false;
+ }
+ this.previousPauseSortTaskTime = System.currentTimeMillis();
+ return true;
+ }
+
+ public boolean canResumeSortTask(long pauseIntervalMs) {
+ long currentTime = System.currentTimeMillis();
+ return currentTime > this.previousPauseSortTaskTime + pauseIntervalMs;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusMetricListener.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusMetricListener.java
new file mode 100644
index 0000000000..6a594d5fa9
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusMetricListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.status;
+
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ *
+ * SortTaskStatusMetricListener
+ */
+public class SortTaskStatusMetricListener implements MetricListener {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(SortTaskStatusMetricListener.class);
+
+ /**
+ * snapshot
+ *
+ * @param domain
+ * @param itemValues
+ */
+ @Override
+ public void snapshot(String domain, List<MetricItemValue> itemValues) {
+ SortTaskStatusRepository.snapshot(itemValues);
+ }
+
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusRepository.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusRepository.java
new file mode 100644
index 0000000000..b47fc7a95d
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusRepository.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.status;
+
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricValue;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * SortTaskStatusRepository
+ *
+ */
+public class SortTaskStatusRepository {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(SortTaskStatusRepository.class);
+
+ public static final String KEY_FAIL_PAUSE_ENABLE =
"sorttask.status.failPauseEnable";
+ public static final boolean DEFAULT_FAIL_PAUSE_ENABLE = false;
+ public static final String KEY_FAIL_COUNT_LIMIT =
"sorttask.status.failCountLimit";
+ public static final int DEFAULT_FAIL_COUNT_LIMIT = 10;
+ public static final String KEY_FAIL_COUNT_PERCENT_LIMIT =
"sorttask.status.failCountPercentLimit";
+ public static final int DEFAULT_FAIL_COUNT_PERCENT_LIMIT = 50;
+ public static final String KEY_PAUSE_INTERVAL_MS =
"sorttask.status.pauseIntervalMs";
+ public static final long DEFAULT_PAUSE_INTERVAL_MS = 300000;
+
+ private static final AtomicBoolean hasInited = new AtomicBoolean(false);
+ private static Context context;
+ private static final ConcurrentHashMap<String, SortTaskStatus> statusMap =
new ConcurrentHashMap<>();
+
+ private static boolean failPauseEnable;
+ private static int failCountLimit;
+ private static int failCountPercentLimit;
+ private static long pauseIntervalMs;
+
+ public static void init() {
+ LOG.info("start to init SortTaskStatusRepository");
+ if (!hasInited.compareAndSet(false, true)) {
+ return;
+ }
+ SortTaskStatusRepository.context = CommonPropertiesHolder.getContext();
+ SortTaskStatusRepository.failPauseEnable =
context.getBoolean(KEY_FAIL_PAUSE_ENABLE, DEFAULT_FAIL_PAUSE_ENABLE);
+ SortTaskStatusRepository.failCountLimit =
context.getInteger(KEY_FAIL_COUNT_LIMIT, DEFAULT_FAIL_COUNT_LIMIT);
+ SortTaskStatusRepository.failCountPercentLimit =
context.getInteger(KEY_FAIL_COUNT_PERCENT_LIMIT,
+ DEFAULT_FAIL_COUNT_PERCENT_LIMIT);
+ SortTaskStatusRepository.pauseIntervalMs =
context.getLong(KEY_PAUSE_INTERVAL_MS, DEFAULT_PAUSE_INTERVAL_MS);
+ }
+
+ public static void resetStatus(String taskName) {
+ statusMap.put(taskName, new SortTaskStatus(taskName));
+ }
+
+ public static void acquirePutChannel(String taskName) {
+ if (!hasInited.get()) {
+ init();
+ }
+ if (!failPauseEnable) {
+ return;
+ }
+ SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k ->
new SortTaskStatus(k));
+ if (taskStatus.isHasFirstSuccess()) {
+ return;
+ }
+ try {
+ taskStatus.tryFirstSend();
+ return;
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public static boolean needPauseSortTask(String taskName) {
+ if (!hasInited.get()) {
+ init();
+ }
+ if (!failPauseEnable) {
+ return false;
+ }
+ SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k ->
new SortTaskStatus(k));
+ return taskStatus.needPauseSortTask(failCountLimit,
failCountPercentLimit);
+ }
+
+ public static boolean canResumeSortTask(String taskName) {
+ if (!hasInited.get()) {
+ init();
+ }
+ if (!failPauseEnable) {
+ return true;
+ }
+ SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k ->
new SortTaskStatus(k));
+ return taskStatus.canResumeSortTask(pauseIntervalMs);
+ }
+
+ public static void snapshot(List<MetricItemValue> itemValues) {
+ if (!hasInited.get()) {
+ init();
+ }
+ if (!failPauseEnable) {
+ return;
+ }
+ LOG.info("start to SortTaskStatusRepository status:{}", statusMap);
+ for (MetricItemValue itemValue : itemValues) {
+ Map<String, String> dimensions = itemValue.getDimensions();
+ String taskName = dimensions.get(SortMetricItem.KEY_TASK_NAME);
+ if (taskName == null) {
+ continue;
+ }
+ SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k
-> new SortTaskStatus(k));
+ Map<String, MetricValue> metrics = itemValue.getMetrics();
+ MetricValue sCount = metrics.get(SortMetricItem.M_SEND_COUNT);
+ if (sCount != null) {
+ taskStatus.getSendCount().addAndGet(sCount.value);
+ }
+ MetricValue sfCount =
metrics.get(SortMetricItem.M_SEND_FAIL_COUNT);
+ if (sfCount != null) {
+ taskStatus.getSendFailCount().addAndGet(sfCount.value);
+ }
+ MetricValue ssCount =
metrics.get(SortMetricItem.M_SEND_SUCCESS_COUNT);
+ if (ssCount != null) {
+ for (int i = 0; i < ssCount.value; i++) {
+ taskStatus.firstSuccess();
+ }
+ }
+ }
+ LOG.info("end to SortTaskStatusRepository status:{}", statusMap);
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 5b4f2f9050..72b8ccc5e4 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -126,7 +126,8 @@ public class KafkaProducerCluster implements LifecycleAware
{
props.putAll(nodeConfig.getProperties() == null ? new HashMap<>()
: nodeConfig.getProperties());
props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodeConfig.getBootstrapServers());
- props.put(ProducerConfig.CLIENT_ID_CONFIG,
nodeConfig.getClientId() + "-" + workerName);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId()
+ + "-" + workerName + "-" + System.currentTimeMillis());
LOG.info("init kafka client by node config info: " + props);
configuredMaxPayloadSize =
Long.parseLong(props.getProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
producer = new KafkaProducer<>(props, new StringSerializer(), new
ByteArraySerializer());
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 200b68714e..39c8813c64 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.inlong.sort.standalone.channel.CacheMessageRecord;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import
org.apache.inlong.sort.standalone.metrics.status.SortTaskStatusRepository;
import com.google.common.base.Preconditions;
import org.apache.flume.channel.ChannelProcessor;
@@ -101,6 +102,7 @@ public class FetchCallback implements ReadCallback {
CacheMessageRecord cacheRecord = new
CacheMessageRecord(messageRecord, client,
CommonPropertiesHolder.getAckPolicy());
for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
+ SortTaskStatusRepository.acquirePutChannel(sortTaskName);
final ProfileEvent profileEvent = new
ProfileEvent(inLongMessage, cacheRecord);
channelProcessor.processEvent(profileEvent);
context.reportToMetric(profileEvent, sortTaskName, "-",
SortSdkSourceContext.FetchResult.SUCCESS);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
index 406c0ed0de..0e7cfad803 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
@@ -22,6 +22,7 @@ import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.sdk.commons.admin.AdminTask;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import
org.apache.inlong.sort.standalone.metrics.status.SortTaskStatusRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.flume.Context;
@@ -97,9 +98,12 @@ public class SortCluster {
if (taskMap.containsKey(newTaskName)) {
continue;
}
- SortTask newTask = new SortTask(newTaskName);
- newTask.start();
- this.taskMap.put(newTaskName, newTask);
+ if (SortTaskStatusRepository.canResumeSortTask(newTaskName)) {
+ SortTaskStatusRepository.resetStatus(newTaskName);
+ SortTask newTask = new SortTask(newTaskName);
+ newTask.start();
+ this.taskMap.put(newTaskName, newTask);
+ }
}
// remove task
deletingTasks.clear();
@@ -116,6 +120,13 @@ public class SortCluster {
this.deletingTasks.add(entry.getValue());
}
}
+ // failPauseTask
+ for (Map.Entry<String, SortTask> entry : taskMap.entrySet()) {
+ String taskName = entry.getKey();
+ if (SortTaskStatusRepository.needPauseSortTask(taskName)) {
+ this.deletingTasks.add(entry.getValue());
+ }
+ }
// stop deleting task list
for (SortTask task : deletingTasks) {
task.stop();