This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b37f944f23 [INLONG-11734][SDK] Optimize SDK stop processing flow
(#11735)
b37f944f23 is described below
commit b37f944f2308c1c8c34bb4aa088d87153d7a4598
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Feb 9 12:55:01 2025 +0800
[INLONG-11734][SDK] Optimize SDK stop processing flow (#11735)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 5 +-
.../sdk/dataproxy/common/ProxyClientConfig.java | 15 +++--
.../inlong/sdk/dataproxy/metric/MetricConfig.java | 19 +++---
.../sdk/dataproxy/metric/MetricDataHolder.java | 11 +++-
.../sdk/dataproxy/network/http/HttpClientMgr.java | 24 ++++---
.../sdk/dataproxy/network/tcp/TcpClientMgr.java | 74 +++++++++++++++++++---
.../inlong/sdk/dataproxy/sender/BaseSender.java | 5 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 33 ++++++++++
8 files changed, 147 insertions(+), 39 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 72c1a6ac67..623a318188 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -67,6 +67,7 @@ public class BaseMsgSenderFactory {
public void close() {
int totalSenderCnt;
+ long startTime = System.currentTimeMillis();
logger.info("MsgSenderFactory({}) is closing", this.factoryNo);
senderCacheLock.writeLock().lock();
try {
@@ -77,8 +78,8 @@ public class BaseMsgSenderFactory {
} finally {
senderCacheLock.writeLock().unlock();
}
- logger.info("MsgSenderFactory({}) closed, release {} inlong senders",
- this.factoryNo, totalSenderCnt);
+ logger.info("MsgSenderFactory({}) closed, release {} inlong senders,
cost {} ms",
+ this.factoryNo, totalSenderCnt, System.currentTimeMillis() -
startTime);
}
public void removeClient(BaseSender msgSender) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index 274b24f16d..acb6db81ba 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -379,11 +379,16 @@ public class ProxyClientConfig implements Cloneable {
return metricConfig.isEnableMetric();
}
- public void setMetricConfig(MetricConfig metricConfig) {
- if (metricConfig == null) {
- throw new IllegalArgumentException("metricConfig is null");
- }
- this.metricConfig = metricConfig;
+ public void setEnableMetric(boolean enableMetric) {
+ this.metricConfig.setEnableMetric(enableMetric);
+ }
+
+ public void setMetricOutIntvlInfo(long metricOutIntvlMs, long
metricOutWarnIntMs) {
+ this.metricConfig.setMetricOutIntvlInfo(metricOutIntvlMs,
metricOutWarnIntMs);
+ }
+
+ public void setMetricKeyMaskInfos(boolean maskGroupId, boolean
maskStreamId) {
+ this.metricConfig.setMetricKeyMaskInfos(maskGroupId, maskStreamId);
}
public MetricConfig getMetricConfig() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
index 6c4f7e2dae..e689781379 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
@@ -69,9 +69,16 @@ public class MetricConfig {
return maskStreamId;
}
- public void setMetricOutIntvlMs(long metricOutIntvlMs) {
- if (metricOutIntvlMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
- this.metricOutIntvlMs = metricOutIntvlMs;
+ public void setMetricOutIntvlInfo(Long metricOutIntvlMs, Long
metricOutWarnIntMs) {
+ if (metricOutIntvlMs != null) {
+ if (metricOutIntvlMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
+ this.metricOutIntvlMs = metricOutIntvlMs;
+ }
+ }
+ if (metricOutWarnIntMs != null) {
+ if (metricOutWarnIntMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
+ this.metricOutWarnIntMs = metricOutWarnIntMs;
+ }
}
}
@@ -83,12 +90,6 @@ public class MetricConfig {
return metricOutWarnIntMs;
}
- public void setMetricOutWarnIntMs(long metricOutWarnIntMs) {
- if (metricOutWarnIntMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
- this.metricOutWarnIntMs = metricOutWarnIntMs;
- }
- }
-
public long getDateFormatIntvlMs() {
return dateFormatIntvlMs;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 222b8099b8..03e260789a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -85,11 +85,11 @@ public class MetricDataHolder implements Runnable {
public void close() {
logger.info("Metric DataHolder({}) closing ......",
this.sender.getSenderId());
// process rest data
- this.outputExecutor.shutdown();
long startTime = System.currentTimeMillis();
+ this.started = false;
+ this.outputExecutor.shutdown();
outputMetricData(startTime, getOldIndex());
outputMetricData(startTime, getCurIndex());
- this.started = false;
logger.info("Metric DataHolder({}) closed, cost = {} ms!",
this.sender.getSenderId(), System.currentTimeMillis() -
startTime);
}
@@ -181,7 +181,12 @@ public class MetricDataHolder implements Runnable {
} catch (InterruptedException e) {
break;
}
- } while (selectedUnit.refCnt.get() > 0);
+ } while (started && selectedUnit.refCnt.get() > 0);
+ if (!started) {
+ logger.info("Metric DataHolder({}) closed, stop output metric
info",
+ sender.getSenderId());
+ return;
+ }
StringBuilder strBuff = new StringBuilder(512);
String rptContent = buildMetricReportInfo(strBuff, reportTime,
selectedUnit);
logger.info("Metric DataHolder({}) output metricInfo={}",
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
index 72758a056e..bc4d2884c3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -123,16 +123,22 @@ public class HttpClientMgr implements ClientMgr {
return;
}
int remainCnt = 0;
+ long stopTime = System.currentTimeMillis();
+ logger.info("ClientMgr({}) is closing...", this.sender.getSenderId());
if (!messageCache.isEmpty()) {
- long startTime = System.currentTimeMillis();
- while (!messageCache.isEmpty()) {
- if (System.currentTimeMillis() - startTime >=
httpConfig.getHttpCloseWaitPeriodMs()) {
- break;
+ if (httpConfig.isDiscardHttpCacheWhenClosing()) {
+ messageCache.clear();
+ } else {
+ long startTime = System.currentTimeMillis();
+ while (!messageCache.isEmpty()) {
+ if (System.currentTimeMillis() - startTime >=
httpConfig.getHttpCloseWaitPeriodMs()) {
+ break;
+ }
+ ProxyUtils.sleepSomeTime(100L);
}
- ProxyUtils.sleepSomeTime(100L);
+ remainCnt = messageCache.size();
+ messageCache.clear();
}
- remainCnt = messageCache.size();
- messageCache.clear();
}
workerServices.shutdown();
if (httpClient != null) {
@@ -142,8 +148,8 @@ public class HttpClientMgr implements ClientMgr {
//
}
}
- logger.info("ClientMgr({}) stopped, remain ({}) messages discarded!",
- this.sender.getSenderId(), remainCnt);
+ logger.info("ClientMgr({}) stopped, remain ({}) messages discarded,
cost {} ms!",
+ this.sender.getSenderId(), remainCnt,
System.currentTimeMillis() - stopTime);
}
@Override
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index 5b4bf601c0..ec048f54ab 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -133,19 +133,14 @@ public class TcpClientMgr implements ClientMgr {
timerObj.stop();
}
this.bootstrap.config().group().shutdownGracefully();
-
this.maintThread.shutDown();
- if (!channelMsgIdMap.isEmpty()) {
- long startTime = System.currentTimeMillis();
- while (!channelMsgIdMap.isEmpty()) {
- if (System.currentTimeMillis() - startTime >=
tcpConfig.getConCloseWaitPeriodMs()) {
- break;
- }
- ProxyUtils.sleepSomeTime(100L);
- }
+ long startTime = System.currentTimeMillis();
+ if (!this.reqTimeouts.isEmpty()) {
+ notifyInflightMsgClosed();
}
this.activeNodes.clear();
- logger.info("ClientMgr({}) stopped!", senderId);
+ logger.info("ClientMgr({}) stopped, release cost {} ms!",
+ senderId, System.currentTimeMillis() - startTime);
}
@Override
@@ -552,6 +547,65 @@ public class TcpClientMgr implements ClientMgr {
return tmpBootstrap;
}
+ public void notifyInflightMsgClosed() {
+ long curTime;
+ Timeout timeoutTask;
+ TcpNettyClient nettyTcpClient;
+ for (Integer messageId : this.reqTimeouts.keySet()) {
+ if (messageId == null) {
+ continue;
+ }
+ timeoutTask = this.reqTimeouts.remove(messageId);
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
+ }
+ TcpCallFuture callFuture = this.reqObjects.remove(messageId);
+ if (callFuture == null) {
+ continue;
+ }
+ curTime = System.currentTimeMillis();
+ // find and process in using clients
+ nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() ==
callFuture.getChanTerm()) {
+ try {
+ nettyTcpClient.getChannel().eventLoop().execute(
+ () -> callFuture.onMessageAck(new
ProcessResult(ErrorCode.SDK_CLOSED)));
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) closed, callback
exception!",
+ senderId, ex);
+ }
+ } finally {
+ nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
+ callFuture.getGroupId(), callFuture.getStreamId(),
callFuture.getMsgCnt(),
+ (System.currentTimeMillis() - curTime));
+ }
+ return;
+ }
+ // find and process in deleting clients
+ nettyTcpClient =
deletingClientMaps.get(callFuture.getClientAddr());
+ if (nettyTcpClient != null
+ && nettyTcpClient.getChanTermId() ==
callFuture.getChanTerm()) {
+ try {
+ nettyTcpClient.getChannel().eventLoop().execute(
+ () -> callFuture.onMessageAck(new
ProcessResult(ErrorCode.SDK_CLOSED)));
+ } catch (Throwable ex) {
+ if (callbackExceptCnt.shouldPrint()) {
+ logger.info("ClientMgr({}) closed, callback2
exception!",
+ senderId, ex);
+ }
+ } finally {
+ nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
+ callFuture.getGroupId(), callFuture.getStreamId(),
callFuture.getMsgCnt(),
+ (System.currentTimeMillis() - curTime));
+ }
+ }
+ }
+ }
+
private class MaintThread extends Thread {
private volatile boolean bShutDown;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index 8399da4dac..3720a98ecb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -133,13 +133,16 @@ public abstract class BaseSender implements ConfigHolder {
if (!senderStatus.compareAndSet(currentStatus, SENDER_STATUS_CLOSED)) {
return;
}
+ long startTime = System.currentTimeMillis();
+ logger.info("Sender({}) instance is stopping...", senderId);
configManager.shutDown();
clientMgr.stop();
metricHolder.close();
if (this.senderFactory != null) {
this.senderFactory.removeClient(this);
}
- logger.info("Sender({}) instance stopped!", senderId);
+ logger.info("Sender({}) instance stopped, cost {} ms!",
+ senderId, System.currentTimeMillis() - startTime);
}
@Override
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 7d71133330..cdcc5bc6c8 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -31,6 +31,7 @@ import java.lang.management.ManagementFactory;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -150,6 +151,38 @@ public class ProxyUtils {
return protocol + ":" + regionName + ":" + groupId;
}
+ /**
+ * get valid attrs, remove invalid attrs
+ * @param attrsMap the input attrs
+ * @return valid attrs
+ */
+ public static Map<String, String> getValidAttrs(Map<String, String>
attrsMap) {
+ if (attrsMap == null || attrsMap.isEmpty()) {
+ return attrsMap;
+ }
+ String tmpValue;
+ Map<String, String> validAttrsMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : attrsMap.entrySet()) {
+ if (StringUtils.isBlank(entry.getKey())
+ || entry.getKey().contains(AttributeConstants.SEPARATOR)
+ ||
entry.getKey().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+ continue;
+ }
+ tmpValue = entry.getKey().trim();
+ if (ProxyUtils.SdkReservedWords.contains(tmpValue)) {
+ continue;
+ }
+ if (entry.getValue() != null) {
+ if (entry.getValue().contains(AttributeConstants.SEPARATOR)
+ ||
entry.getValue().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+ continue;
+ }
+ }
+ validAttrsMap.put(tmpValue, entry.getValue());
+ }
+ return validAttrsMap;
+ }
+
public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
if (attrsMap == null || attrsMap.size() == 0) {
return false;