This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 58fe6ee3c8 [INLONG-11459][SDK] Add MetricConfig class to save
metric-related settings (#11460)
58fe6ee3c8 is described below
commit 58fe6ee3c8b5e76949acc79b00e9c2d9031acf05
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Nov 5 18:55:46 2024 +0800
[INLONG-11459][SDK] Add MetricConfig class to save metric-related settings
(#11460)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 72 +++----------
.../inlong/sdk/dataproxy/metric/MetricConfig.java | 118 +++++++++++++++++++++
.../inlong/sdk/dataproxy/network/Sender.java | 38 ++++---
.../sdk/dataproxy/threads/MetricWorkerThread.java | 57 +++++-----
4 files changed, 181 insertions(+), 104 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index f866b4b76d..d74f876fab 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Utils;
@@ -55,8 +56,8 @@ public class ProxyClientConfig {
private String protocolType;
private boolean enableSaveManagerVIps = false;
-
- private boolean enableSlaMetric = false;
+ // metric configure
+ private MetricConfig metricConfig = new MetricConfig();
private int managerConnectionTimeout = 10000;
private boolean readProxyIPFromLocal = false;
@@ -77,20 +78,8 @@ public class ProxyClientConfig {
// interval for async worker in microseconds.
private int asyncWorkerInterval = 500;
private boolean cleanHttpCacheWhenClosing = false;
-
- // config for metric collector
- // whether use groupId as key for metric, default is true
- private boolean useGroupIdAsKey = true;
- // whether use StreamId as key for metric, default is true
- private boolean useStreamIdAsKey = true;
- // whether use localIp as key for metric, default is true
- private boolean useLocalIpAsKey = true;
- // metric collection interval, default is 1 mins in milliseconds.
- private int metricIntervalInMs = 60 * 1000;
// max cache time for proxy config.
private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
- // metric groupId
- private String metricGroupId = "inlong_sla_metric";
private int ioThreadNum = Runtime.getRuntime().availableProcessors();
private boolean enableBusyWait = false;
@@ -446,46 +435,6 @@ public class ProxyClientConfig {
this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing;
}
- public boolean isUseGroupIdAsKey() {
- return useGroupIdAsKey;
- }
-
- public void setUseGroupIdAsKey(boolean useGroupIdAsKey) {
- this.useGroupIdAsKey = useGroupIdAsKey;
- }
-
- public boolean isUseStreamIdAsKey() {
- return useStreamIdAsKey;
- }
-
- public void setUseStreamIdAsKey(boolean useStreamIdAsKey) {
- this.useStreamIdAsKey = useStreamIdAsKey;
- }
-
- public boolean isUseLocalIpAsKey() {
- return useLocalIpAsKey;
- }
-
- public void setUseLocalIpAsKey(boolean useLocalIpAsKey) {
- this.useLocalIpAsKey = useLocalIpAsKey;
- }
-
- public int getMetricIntervalInMs() {
- return metricIntervalInMs;
- }
-
- public void setMetricIntervalInMs(int metricIntervalInMs) {
- this.metricIntervalInMs = metricIntervalInMs;
- }
-
- public String getMetricGroupId() {
- return metricGroupId;
- }
-
- public void setMetricGroupId(String metricGroupId) {
- this.metricGroupId = metricGroupId;
- }
-
public long getMaxProxyCacheTimeInMs() {
return maxProxyCacheTimeInMs;
}
@@ -502,12 +451,19 @@ public class ProxyClientConfig {
this.managerConnectionTimeout = managerConnectionTimeout;
}
- public boolean isEnableSlaMetric() {
- return enableSlaMetric;
+ public MetricConfig getMetricConfig() {
+ return metricConfig;
}
- public void setEnableSlaMetric(boolean enableSlaMetric) {
- this.enableSlaMetric = enableSlaMetric;
+ public boolean isEnableMetric() {
+ return metricConfig.isEnableMetric();
+ }
+
+ public void setMetricConfig(MetricConfig metricConfig) {
+ if (metricConfig == null) {
+ return;
+ }
+ this.metricConfig = metricConfig;
}
public int getIoThreadNum() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
new file mode 100644
index 0000000000..2a9543af29
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.metric;
+
+import org.apache.commons.lang.StringUtils;
+
+public class MetricConfig {
+
+ private static final long DEF_METRIC_REPORT_INTVL_MS = 60000L;
+ private static final long MIN_METRIC_REPORT_INTVL_MS = 30000L;
+ private static final long DEF_METRIC_DATE_FORMAT_MS = 60000L;
+ private static final long MIN_METRIC_DATE_FORMAT_MS = 1L;
+ private static final String DEF_METRIC_REPORT_GROUP_ID =
"inlong_sla_metric";
+ // metric enable
+ private boolean enableMetric = false;
+ // whether use groupId as key for metric, default is true
+ private boolean useGroupIdAsKey = true;
+ // whether use StreamId as key for metric, default is true
+ private boolean useStreamIdAsKey = true;
+ // whether use localIp as key for metric, default is true
+ private boolean useLocalIpAsKey = true;
+ // metric report interval, default is 1 mins in milliseconds.
+ private long metricRptIntvlMs = DEF_METRIC_REPORT_INTVL_MS;
+ // metric date format
+ private long dateFormatIntvlMs = DEF_METRIC_DATE_FORMAT_MS;
+ // metric groupId
+ private String metricGroupId = DEF_METRIC_REPORT_GROUP_ID;
+
+ public MetricConfig() {
+
+ }
+
+ public void setEnableMetric(boolean enableMetric) {
+ this.enableMetric = enableMetric;
+ }
+
+ public boolean isEnableMetric() {
+ return enableMetric;
+ }
+
+ public void setMetricKeyBuildParams(
+ boolean useGroupIdAsKey, boolean useStreamIdAsKey, boolean
useLocalIpAsKey) {
+ this.useGroupIdAsKey = useGroupIdAsKey;
+ this.useStreamIdAsKey = useStreamIdAsKey;
+ this.useLocalIpAsKey = useLocalIpAsKey;
+ }
+
+ public boolean isUseGroupIdAsKey() {
+ return useGroupIdAsKey;
+ }
+
+ public boolean isUseStreamIdAsKey() {
+ return useStreamIdAsKey;
+ }
+
+ public boolean isUseLocalIpAsKey() {
+ return useLocalIpAsKey;
+ }
+
+ public void setMetricRptIntvlMs(long metricRptIntvlMs) {
+ if (metricRptIntvlMs >= MIN_METRIC_REPORT_INTVL_MS) {
+ this.metricRptIntvlMs = metricRptIntvlMs;
+ }
+ }
+
+ public long getMetricRptIntvlMs() {
+ return metricRptIntvlMs;
+ }
+
+ public void setDateFormatIntvlMs(long dateFormatIntvlMs) {
+ if (dateFormatIntvlMs >= MIN_METRIC_DATE_FORMAT_MS) {
+ this.dateFormatIntvlMs = dateFormatIntvlMs;
+ }
+ }
+
+ public long getDateFormatIntvlMs() {
+ return dateFormatIntvlMs;
+ }
+
+ public String getMetricGroupId() {
+ return metricGroupId;
+ }
+
+ public void setMetricGroupId(String metricGroupId) {
+ if (StringUtils.isNotBlank(metricGroupId)) {
+ this.metricGroupId = metricGroupId;
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("MetricConfig{");
+ sb.append("enableMetric=").append(enableMetric);
+ sb.append(", useGroupIdAsKey=").append(useGroupIdAsKey);
+ sb.append(", useStreamIdAsKey=").append(useStreamIdAsKey);
+ sb.append(", useLocalIpAsKey=").append(useLocalIpAsKey);
+ sb.append(", metricRptIntvlMs=").append(metricRptIntvlMs);
+ sb.append(", dateFormatIntvlMs=").append(dateFormatIntvlMs);
+ sb.append(", metricGroupId='").append(metricGroupId).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index d68a0c2330..9581da1f80 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -64,7 +64,7 @@ public class Sender {
private final ClientMgr clientMgr;
private final ProxyClientConfig configure;
private final boolean isFile;
- private final MetricWorkerThread metricWorker;
+ private MetricWorkerThread metricWorker = null;
private int clusterId = -1;
public Sender(ProxyClientConfig configure) throws Exception {
@@ -102,8 +102,11 @@ public class Sender {
scanThread = new TimeoutScanThread(callbacks, currentBufferSize,
configure, clientMgr);
scanThread.start();
- metricWorker = new MetricWorkerThread(configure, this);
- metricWorker.start();
+ if (configure.isEnableMetric()) {
+ metricWorker = new MetricWorkerThread(configure, this);
+ metricWorker.start();
+ }
+
LOGGER.info("proxy sdk is starting!");
}
@@ -130,7 +133,9 @@ public class Sender {
scanThread.shutDown();
clientMgr.shutDown();
threadPool.shutdown();
- metricWorker.close();
+ if (configure.isEnableMetric()) {
+ metricWorker.close();
+ }
}
public String getExceptionStack(Throwable e) {
@@ -227,8 +232,11 @@ public class Sender {
* @return
*/
public SendResult syncSendMessage(EncodeObject encodeObject, String
msgUUID, long timeout, TimeUnit timeUnit) {
- metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(), encodeObject.getStreamId(),
- Utils.getLocalIp(), encodeObject.getDt(),
encodeObject.getPackageTime(), encodeObject.getRealCnt());
+ if (configure.isEnableMetric()) {
+ metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
+ encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getDt(),
+ encodeObject.getPackageTime(), encodeObject.getRealCnt());
+ }
NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(),
encodeObject);
SendResult message = null;
try {
@@ -272,7 +280,9 @@ public class Sender {
scanThread.resetTimeoutChannel(client.getChannel());
}
if (message == SendResult.OK) {
- metricWorker.recordSuccessByMessageId(encodeObject.getMessageId());
+ if (configure.isEnableMetric()) {
+
metricWorker.recordSuccessByMessageId(encodeObject.getMessageId());
+ }
}
return message;
}
@@ -510,12 +520,12 @@ public class Sender {
*/
public void asyncSendMessage(EncodeObject encodeObject,
SendMessageCallback callback, String msgUUID,
long timeout, TimeUnit timeUnit) throws ProxysdkException {
- metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
- encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getPackageTime(),
- encodeObject.getDt(), encodeObject.getRealCnt());
-
+ if (configure.isEnableMetric()) {
+ metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
+ encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getPackageTime(),
+ encodeObject.getDt(), encodeObject.getRealCnt());
+ }
// send message package time
-
NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(),
encodeObject);
if (client == null) {
throw new ProxysdkException(SendResult.NO_CONNECTION.toString());
@@ -585,7 +595,9 @@ public class Sender {
SyncMessageCallable callable = syncCallables.remove(messageId);
SendResult result = response.getSendResult();
if (result == SendResult.OK) {
- metricWorker.recordSuccessByMessageId(messageId);
+ if (configure.isEnableMetric()) {
+ metricWorker.recordSuccessByMessageId(messageId);
+ }
} else {
LOGGER.error("{} exception happens, error message {}", channel,
response.getErrMsg());
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index ac6da06c61..270531bf5b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.FileCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
+import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
@@ -40,38 +41,30 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class MetricWorkerThread extends Thread implements Closeable {
+ private static final long DEF_METRIC_DELAY_TIME_MS = 20 * 1000L;
private static final String DEFAULT_KEY_ITEM = "";
private static final String DEFAULT_KEY_SPLITTER = "#";
private final Logger logger =
LoggerFactory.getLogger(MetricWorkerThread.class);
private final SequentialID idGenerator = new
SequentialID(Utils.getLocalIp());
-
private final ConcurrentHashMap<String, MessageRecord> metricValueCache =
new ConcurrentHashMap<>();
-
private final ConcurrentHashMap<String, MetricTimeNumSummary>
metricPackTimeMap = new ConcurrentHashMap<>();
-
private final ConcurrentHashMap<String, MetricTimeNumSummary> metricDtMap
= new ConcurrentHashMap<>();
-
- private final ProxyClientConfig proxyClientConfig;
-
- private final long delayTime;
+ private final MetricConfig metricConfig;
+ private final long delayTime = DEF_METRIC_DELAY_TIME_MS;
private final Sender sender;
- private final boolean enableSlaMetric;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private volatile boolean bShutdown = false;
public MetricWorkerThread(ProxyClientConfig proxyClientConfig, Sender
sender) {
- this.proxyClientConfig = proxyClientConfig;
- this.enableSlaMetric = proxyClientConfig.isEnableSlaMetric();
-
- this.delayTime = 20 * 1000;
+ this.metricConfig = proxyClientConfig.getMetricConfig();
this.sender = sender;
this.setDaemon(true);
this.setName("MetricWorkerThread");
}
public long getFormatKeyTime(long keyTime) {
- return keyTime - keyTime % proxyClientConfig.getMetricIntervalInMs();
+ return keyTime - keyTime % metricConfig.getDateFormatIntvlMs();
}
/**
@@ -79,9 +72,9 @@ public class MetricWorkerThread extends Thread implements
Closeable {
*/
private String getKeyStringByConfig(String groupId, String streamId,
String localIp, long keyTime) {
StringBuilder builder = new StringBuilder();
- String groupIdStr = proxyClientConfig.isUseGroupIdAsKey() ? groupId :
DEFAULT_KEY_ITEM;
- String streamIdStr = proxyClientConfig.isUseStreamIdAsKey() ? streamId
: DEFAULT_KEY_ITEM;
- String localIpStr = proxyClientConfig.isUseLocalIpAsKey() ? localIp :
DEFAULT_KEY_ITEM;
+ String groupIdStr = metricConfig.isUseGroupIdAsKey() ? groupId :
DEFAULT_KEY_ITEM;
+ String streamIdStr = metricConfig.isUseStreamIdAsKey() ? streamId :
DEFAULT_KEY_ITEM;
+ String localIpStr = metricConfig.isUseLocalIpAsKey() ? localIp :
DEFAULT_KEY_ITEM;
builder.append(groupIdStr).append(DEFAULT_KEY_SPLITTER)
.append(streamIdStr).append(DEFAULT_KEY_SPLITTER)
@@ -103,7 +96,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
*/
public void recordNumByKey(String msgId, String groupId, String streamId,
String localIp, long packTime, long dt, int num) {
- if (!enableSlaMetric) {
+ if (!metricConfig.isEnableMetric()) {
return;
}
MessageRecord messageRecord = new MessageRecord(groupId, streamId,
localIp, msgId,
@@ -127,7 +120,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
* @param msgId msg id
*/
public void recordSuccessByMessageId(String msgId) {
- if (!enableSlaMetric) {
+ if (!metricConfig.isEnableMetric()) {
return;
}
MessageRecord messageRecord = metricValueCache.remove(msgId);
@@ -176,35 +169,36 @@ public class MetricWorkerThread extends Thread implements
Closeable {
public void close() {
bShutdown = true;
flushMetric(true);
+ logger.info("MetricWorkerThread closed!");
}
@Override
public void run() {
- logger.info("MetricWorkerThread Thread=" +
Thread.currentThread().getId() + " started!");
+ logger.info("MetricWorkerThread thread=" +
Thread.currentThread().getId() + " started!");
while (!bShutdown) {
// check metric
try {
checkCacheRecords();
flushMetric(false);
-
TimeUnit.MILLISECONDS.sleep(proxyClientConfig.getMetricIntervalInMs());
- } catch (Exception ex) {
+
TimeUnit.MILLISECONDS.sleep(metricConfig.getMetricRptIntvlMs());
+ } catch (Throwable ex) {
// exception happens
}
}
+ logger.info("MetricWorkerThread thread existed!");
}
private void tryToSendMetricToManager(EncodeObject encodeObject,
MetricSendCallBack callBack) {
callBack.increaseRetry();
try {
-
if (callBack.getRetryCount() < 4) {
sender.asyncSendMessageIndex(encodeObject, callBack,
String.valueOf(System.currentTimeMillis()), 20,
TimeUnit.SECONDS);
} else {
- logger.error("error while sending {} {}",
encodeObject.getBodyBytes(), encodeObject.getBodylist());
+ logger.error("Send metric failure: {} {}",
encodeObject.getBodyBytes(), encodeObject.getBodylist());
}
- } catch (Exception ex) {
- logger.warn("exception caught {}", ex.getMessage());
+ } catch (Throwable ex) {
+ logger.warn("Send metric throw exception", ex);
tryToSendMetricToManager(encodeObject, callBack);
}
}
@@ -213,7 +207,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7,
false, false, false,
dtTime, idGenerator.getNextInt(),
- proxyClientConfig.getMetricGroupId(), streamId, "", "",
Utils.getLocalIp());
+ metricConfig.getMetricGroupId(), streamId, "", "",
Utils.getLocalIp());
MetricSendCallBack callBack = new MetricSendCallBack(encodeObject);
tryToSendMetricToManager(encodeObject, callBack);
}
@@ -222,7 +216,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
for (String keyName : cacheMap.keySet()) {
MetricTimeNumSummary summary = cacheMap.get(keyName);
if (isClosing || (summary != null && summary.getSummaryTime()
- + delayTime > proxyClientConfig.getMetricIntervalInMs())) {
+ + delayTime > metricConfig.getMetricRptIntvlMs())) {
summary = cacheMap.remove(keyName);
if (summary != null) {
long metricDtTime = summary.getStartCalculateTime() / 1000;
@@ -231,9 +225,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
+ DEFAULT_KEY_SPLITTER + summary.getFailedNum()
+ DEFAULT_KEY_SPLITTER + summary.getTotalNum();
String timeLine = keyName + DEFAULT_KEY_SPLITTER +
summary.getTimeString();
-
- logger.info("sending {}", countLine);
- logger.info("sending {}", timeLine);
+ logger.info("Send metric countLine={}, timeLine={}",
countLine, timeLine);
sendSingleLine(countLine, "count", metricDtTime);
sendSingleLine(timeLine, "time", metricDtTime);
}
@@ -255,8 +247,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
private void checkCacheRecords() {
for (String msgId : metricValueCache.keySet()) {
MessageRecord record = metricValueCache.get(msgId);
-
- if (record != null && record.getMessageTime() + delayTime >
proxyClientConfig.getMetricIntervalInMs()) {
+ if (record != null && record.getMessageTime() + delayTime >
metricConfig.getMetricRptIntvlMs()) {
recordFailedByMessageId(msgId);
}
}
@@ -298,7 +289,7 @@ public class MetricWorkerThread extends Thread implements
Closeable {
if (!SendResult.OK.toString().equals(result)) {
tryToSendMetricToManager(encodeObject, this);
} else {
- logger.info("metric is ok");
+ logger.debug("Send metric is ok!");
}
}