This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ce3e996e16 [INLONG-11836][Sort] Provide SortStandalone flow control to 
prevent single-task blocking from affecting the normal sorting of other tasks 
(#11837)
ce3e996e16 is described below

commit ce3e996e1659b50d4fc99f5d0833f0021056d0b4
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Apr 22 12:50:09 2025 +0800

    [INLONG-11836][Sort] Provide SortStandalone flow control to prevent 
single-task blocking from affecting the normal sorting of other tasks (#11837)
---
 .../standalone/metrics/status/SortTaskStatus.java  |  72 ++++++++++
 .../status/SortTaskStatusMetricListener.java       |  47 +++++++
 .../metrics/status/SortTaskStatusRepository.java   | 151 +++++++++++++++++++++
 .../sink/kafka/KafkaProducerCluster.java           |   3 +-
 .../standalone/source/sortsdk/FetchCallback.java   |   2 +
 .../inlong/sort/standalone/v2/SortCluster.java     |  17 ++-
 6 files changed, 288 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatus.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatus.java
new file mode 100644
index 0000000000..cd2aa48733
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.status;
+
+import lombok.Data;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * SortTaskStatus
+ */
+@Data
+public class SortTaskStatus {
+
+    private String taskName;
+    private boolean hasFirstSuccess = false;
+    private Semaphore firstSuccessPermit = new Semaphore(1);
+    private AtomicLong sendCount = new AtomicLong(0);
+    private AtomicLong sendFailCount = new AtomicLong(0);
+    private long previousPauseSortTaskTime = Long.MIN_VALUE;
+
+    public SortTaskStatus(String taskName) {
+        this.taskName = taskName;
+    }
+
+    public void tryFirstSend() throws InterruptedException {
+        firstSuccessPermit.acquire(1);
+    }
+
+    public void firstSuccess() {
+        hasFirstSuccess = true;
+        firstSuccessPermit.release(1);
+    }
+
+    public boolean needPauseSortTask(int failCountLimit, int 
failCountPercentLimit) {
+        long sendFail = this.sendFailCount.getAndSet(0);
+        long send = this.sendCount.getAndSet(0);
+        if (sendFail < failCountLimit) {
+            return false;
+        }
+        if (send <= 0) {
+            return false;
+        }
+        long rate = sendFail * 100 / send;
+        if (rate < failCountPercentLimit) {
+            return false;
+        }
+        this.previousPauseSortTaskTime = System.currentTimeMillis();
+        return true;
+    }
+
+    public boolean canResumeSortTask(long pauseIntervalMs) {
+        long currentTime = System.currentTimeMillis();
+        return currentTime > this.previousPauseSortTaskTime + pauseIntervalMs;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusMetricListener.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusMetricListener.java
new file mode 100644
index 0000000000..6a594d5fa9
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusMetricListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.status;
+
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * 
+ * SortTaskStatusMetricListener
+ */
+public class SortTaskStatusMetricListener implements MetricListener {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortTaskStatusMetricListener.class);
+
+    /**
+     * snapshot
+     * 
+     * @param domain
+     * @param itemValues
+     */
+    @Override
+    public void snapshot(String domain, List<MetricItemValue> itemValues) {
+        SortTaskStatusRepository.snapshot(itemValues);
+    }
+
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusRepository.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusRepository.java
new file mode 100644
index 0000000000..b47fc7a95d
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/status/SortTaskStatusRepository.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.status;
+
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricValue;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * SortTaskStatusRepository
+ * 
+ */
+public class SortTaskStatusRepository {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortTaskStatusRepository.class);
+
+    public static final String KEY_FAIL_PAUSE_ENABLE = 
"sorttask.status.failPauseEnable";
+    public static final boolean DEFAULT_FAIL_PAUSE_ENABLE = false;
+    public static final String KEY_FAIL_COUNT_LIMIT = 
"sorttask.status.failCountLimit";
+    public static final int DEFAULT_FAIL_COUNT_LIMIT = 10;
+    public static final String KEY_FAIL_COUNT_PERCENT_LIMIT = 
"sorttask.status.failCountPercentLimit";
+    public static final int DEFAULT_FAIL_COUNT_PERCENT_LIMIT = 50;
+    public static final String KEY_PAUSE_INTERVAL_MS = 
"sorttask.status.pauseIntervalMs";
+    public static final long DEFAULT_PAUSE_INTERVAL_MS = 300000;
+
+    private static final AtomicBoolean hasInited = new AtomicBoolean(false);
+    private static Context context;
+    private static final ConcurrentHashMap<String, SortTaskStatus> statusMap = 
new ConcurrentHashMap<>();
+
+    private static boolean failPauseEnable;
+    private static int failCountLimit;
+    private static int failCountPercentLimit;
+    private static long pauseIntervalMs;
+
+    public static void init() {
+        LOG.info("start to init SortTaskStatusRepository");
+        if (!hasInited.compareAndSet(false, true)) {
+            return;
+        }
+        SortTaskStatusRepository.context = CommonPropertiesHolder.getContext();
+        SortTaskStatusRepository.failPauseEnable = 
context.getBoolean(KEY_FAIL_PAUSE_ENABLE, DEFAULT_FAIL_PAUSE_ENABLE);
+        SortTaskStatusRepository.failCountLimit = 
context.getInteger(KEY_FAIL_COUNT_LIMIT, DEFAULT_FAIL_COUNT_LIMIT);
+        SortTaskStatusRepository.failCountPercentLimit = 
context.getInteger(KEY_FAIL_COUNT_PERCENT_LIMIT,
+                DEFAULT_FAIL_COUNT_PERCENT_LIMIT);
+        SortTaskStatusRepository.pauseIntervalMs = 
context.getLong(KEY_PAUSE_INTERVAL_MS, DEFAULT_PAUSE_INTERVAL_MS);
+    }
+
+    public static void resetStatus(String taskName) {
+        statusMap.put(taskName, new SortTaskStatus(taskName));
+    }
+
+    public static void acquirePutChannel(String taskName) {
+        if (!hasInited.get()) {
+            init();
+        }
+        if (!failPauseEnable) {
+            return;
+        }
+        SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k -> 
new SortTaskStatus(k));
+        if (taskStatus.isHasFirstSuccess()) {
+            return;
+        }
+        try {
+            taskStatus.tryFirstSend();
+            return;
+        } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    public static boolean needPauseSortTask(String taskName) {
+        if (!hasInited.get()) {
+            init();
+        }
+        if (!failPauseEnable) {
+            return false;
+        }
+        SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k -> 
new SortTaskStatus(k));
+        return taskStatus.needPauseSortTask(failCountLimit, 
failCountPercentLimit);
+    }
+
+    public static boolean canResumeSortTask(String taskName) {
+        if (!hasInited.get()) {
+            init();
+        }
+        if (!failPauseEnable) {
+            return true;
+        }
+        SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k -> 
new SortTaskStatus(k));
+        return taskStatus.canResumeSortTask(pauseIntervalMs);
+    }
+
+    public static void snapshot(List<MetricItemValue> itemValues) {
+        if (!hasInited.get()) {
+            init();
+        }
+        if (!failPauseEnable) {
+            return;
+        }
+        LOG.info("start to SortTaskStatusRepository status:{}", statusMap);
+        for (MetricItemValue itemValue : itemValues) {
+            Map<String, String> dimensions = itemValue.getDimensions();
+            String taskName = dimensions.get(SortMetricItem.KEY_TASK_NAME);
+            if (taskName == null) {
+                continue;
+            }
+            SortTaskStatus taskStatus = statusMap.computeIfAbsent(taskName, k 
-> new SortTaskStatus(k));
+            Map<String, MetricValue> metrics = itemValue.getMetrics();
+            MetricValue sCount = metrics.get(SortMetricItem.M_SEND_COUNT);
+            if (sCount != null) {
+                taskStatus.getSendCount().addAndGet(sCount.value);
+            }
+            MetricValue sfCount = 
metrics.get(SortMetricItem.M_SEND_FAIL_COUNT);
+            if (sfCount != null) {
+                taskStatus.getSendFailCount().addAndGet(sfCount.value);
+            }
+            MetricValue ssCount = 
metrics.get(SortMetricItem.M_SEND_SUCCESS_COUNT);
+            if (ssCount != null) {
+                for (int i = 0; i < ssCount.value; i++) {
+                    taskStatus.firstSuccess();
+                }
+            }
+        }
+        LOG.info("end to SortTaskStatusRepository status:{}", statusMap);
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 5b4f2f9050..72b8ccc5e4 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -126,7 +126,8 @@ public class KafkaProducerCluster implements LifecycleAware 
{
             props.putAll(nodeConfig.getProperties() == null ? new HashMap<>() 
: nodeConfig.getProperties());
             props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks());
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
nodeConfig.getBootstrapServers());
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, 
nodeConfig.getClientId() + "-" + workerName);
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId()
+                    + "-" + workerName + "-" + System.currentTimeMillis());
             LOG.info("init kafka client by node config info: " + props);
             configuredMaxPayloadSize = 
Long.parseLong(props.getProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
             producer = new KafkaProducer<>(props, new StringSerializer(), new 
ByteArraySerializer());
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 200b68714e..39c8813c64 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sdk.sort.entity.MessageRecord;
 import org.apache.inlong.sort.standalone.channel.CacheMessageRecord;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import 
org.apache.inlong.sort.standalone.metrics.status.SortTaskStatusRepository;
 
 import com.google.common.base.Preconditions;
 import org.apache.flume.channel.ChannelProcessor;
@@ -101,6 +102,7 @@ public class FetchCallback implements ReadCallback {
             CacheMessageRecord cacheRecord = new 
CacheMessageRecord(messageRecord, client,
                     CommonPropertiesHolder.getAckPolicy());
             for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
+                SortTaskStatusRepository.acquirePutChannel(sortTaskName);
                 final ProfileEvent profileEvent = new 
ProfileEvent(inLongMessage, cacheRecord);
                 channelProcessor.processEvent(profileEvent);
                 context.reportToMetric(profileEvent, sortTaskName, "-", 
SortSdkSourceContext.FetchResult.SUCCESS);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
index 406c0ed0de..0e7cfad803 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
@@ -22,6 +22,7 @@ import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.sdk.commons.admin.AdminTask;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import 
org.apache.inlong.sort.standalone.metrics.status.SortTaskStatusRepository;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flume.Context;
@@ -97,9 +98,12 @@ public class SortCluster {
                 if (taskMap.containsKey(newTaskName)) {
                     continue;
                 }
-                SortTask newTask = new SortTask(newTaskName);
-                newTask.start();
-                this.taskMap.put(newTaskName, newTask);
+                if (SortTaskStatusRepository.canResumeSortTask(newTaskName)) {
+                    SortTaskStatusRepository.resetStatus(newTaskName);
+                    SortTask newTask = new SortTask(newTaskName);
+                    newTask.start();
+                    this.taskMap.put(newTaskName, newTask);
+                }
             }
             // remove task
             deletingTasks.clear();
@@ -116,6 +120,13 @@ public class SortCluster {
                     this.deletingTasks.add(entry.getValue());
                 }
             }
+            // failPauseTask
+            for (Map.Entry<String, SortTask> entry : taskMap.entrySet()) {
+                String taskName = entry.getKey();
+                if (SortTaskStatusRepository.needPauseSortTask(taskName)) {
+                    this.deletingTasks.add(entry.getValue());
+                }
+            }
             // stop deleting task list
             for (SortTask task : deletingTasks) {
                 task.stop();

Reply via email to