This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d9dc65e83 [INLONG-6985][DataProxy] Make maxMonitorCnt setting
configurable (#6990)
d9dc65e83 is described below
commit d9dc65e836104f47e009f9eb11d380f1d1169e1f
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Dec 21 09:50:12 2022 +0800
[INLONG-6985][DataProxy] Make maxMonitorCnt setting configurable (#6990)
---
.../conf/dataproxy-mulit-pulsar-http-example.conf | 7 ++++++-
.../conf/dataproxy-mulit-pulsar-udp-example.conf | 5 +++++
inlong-dataproxy/conf/dataproxy-tubemq.conf | 12 ++++++++++++
inlong-dataproxy/conf/dataproxy.conf | 7 +++++++
.../apache/inlong/dataproxy/consts/ConfigConstants.java | 1 +
.../org/apache/inlong/dataproxy/http/HttpBaseSource.java | 13 +++++++++----
.../java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 14 ++++++++++----
.../java/org/apache/inlong/dataproxy/sink/TubeSink.java | 14 ++++++++++----
.../org/apache/inlong/dataproxy/source/BaseSource.java | 11 ++++++++++-
9 files changed, 70 insertions(+), 14 deletions(-)
diff --git a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf
b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf
index b744ecefb..56d2d7a85 100644
--- a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf
+++ b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf
@@ -46,6 +46,7 @@ agent1.sources.http-source.metric-recovery-path =
./data/file/recovery
agent1.sources.http-source.metric-agent-port=8003
agent1.sources.http-source.metric-cache-size=1000000
agent1.sources.http-source.set=10
+agent1.sources.http-source.max-monitor-cnt=500000
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 10000
@@ -77,12 +78,16 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg1.max-monitor-cnt=500000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg2.max-monitor-cnt=500000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg5.max-monitor-cnt=500000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
-agent1.sinks.pulsar-sink-msg6.type =
org.apache.inlong.dataproxy.sink.PulsarSink
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg6.max-monitor-cnt=500000
\ No newline at end of file
diff --git a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf
b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf
index 018b02bc1..827db35a1 100644
--- a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf
+++ b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf
@@ -46,6 +46,7 @@ agent1.sources.upd-source.metric-recovery-path =
./data/file/recovery
agent1.sources.upd-source.metric-agent-port = 8003
agent1.sources.upd-source.metric-cache-size = 1000000
agent1.sources.upd-source.set = 10
+agent1.sources.upd-source.max-monitor-cnt=500000
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 10000
@@ -77,12 +78,16 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg1.max-monitor-cnt=500000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg2.max-monitor-cnt=500000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg5.max-monitor-cnt=500000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg6.max-monitor-cnt=500000
diff --git a/inlong-dataproxy/conf/dataproxy-tubemq.conf
b/inlong-dataproxy/conf/dataproxy-tubemq.conf
index 67f2e7fcf..9e4180165 100644
--- a/inlong-dataproxy/conf/dataproxy-tubemq.conf
+++ b/inlong-dataproxy/conf/dataproxy-tubemq.conf
@@ -51,6 +51,7 @@ agent1.sources.tcp-source.set=10
agent1.sources.tcp-source.old-metric-on=true
agent1.sources.tcp-source.new-metric-on=true
agent1.sources.tcp-source.metric_topic_prefix=manager_tmertic
+agent1.sources.tcp-source.max-monitor-cnt=500000
# http-source
agent1.sources.http-source.channels = ch-msg1 ch-msg2 ch-msg3 ch-msg5 ch-msg6
ch-msg7 ch-msg8 ch-msg9 ch-msg10 ch-back
@@ -76,6 +77,7 @@ agent1.sources.http-source.set=10
agent1.sources.http-source.old-metric-on=true
agent1.sources.http-source.new-metric-on=true
agent1.sources.http-source.metric_topic_prefix=manager_tmertic
+agent1.sources.http-source.max-monitor-cnt=500000
agent1.channels.ch-back.type = memory
agent1.channels.ch-back.capacity = 10000000
@@ -154,39 +156,49 @@ agent1.channels.ch-msg10.fsyncInterval = 5
agent1.sinks.meta-sink-msg1.channel = ch-msg1
agent1.sinks.meta-sink-msg1.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg1.maxThreads = 1
+agent1.sinks.meta-sink-msg1.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg2.channel = ch-msg2
agent1.sinks.meta-sink-msg2.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg2.maxThreads = 1
+agent1.sinks.meta-sink-msg2.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg3.channel = ch-msg3
agent1.sinks.meta-sink-msg3.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg3.maxThreads = 1
+agent1.sinks.meta-sink-msg3.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg5.channel = ch-msg5
agent1.sinks.meta-sink-msg5.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg5.maxThreads = 1
+agent1.sinks.meta-sink-msg5.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg6.channel = ch-msg6
agent1.sinks.meta-sink-msg6.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg6.maxThreads = 1
+agent1.sinks.meta-sink-msg6.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg7.channel = ch-msg7
agent1.sinks.meta-sink-msg7.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg7.maxThreads = 1
+agent1.sinks.meta-sink-msg7.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg8.channel = ch-msg8
agent1.sinks.meta-sink-msg8.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg8.maxThreads = 1
+agent1.sinks.meta-sink-msg8.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg9.channel = ch-msg9
agent1.sinks.meta-sink-msg9.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg9.maxThreads = 1
+agent1.sinks.meta-sink-msg9.max-monitor-cnt=500000
agent1.sinks.meta-sink-msg10.channel = ch-msg10
agent1.sinks.meta-sink-msg10.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-msg10.maxThreads = 1
+agent1.sinks.meta-sink-msg10.max-monitor-cnt=500000
agent1.sinks.meta-sink-back.channel = ch-back
agent1.sinks.meta-sink-back.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.meta-sink-back.maxThreads = 1
+agent1.sinks.meta-sink-back.max-monitor-cnt=500000
\ No newline at end of file
diff --git a/inlong-dataproxy/conf/dataproxy.conf
b/inlong-dataproxy/conf/dataproxy.conf
index d815193e3..c44e4525d 100644
--- a/inlong-dataproxy/conf/dataproxy.conf
+++ b/inlong-dataproxy/conf/dataproxy.conf
@@ -58,6 +58,7 @@ agent1.sources.tcp-source.metric-recovery-path =
./data/file/recovery
agent1.sources.tcp-source.metric-agent-port = 8003
agent1.sources.tcp-source.metric-cache-size = 1000000
agent1.sources.tcp-source.set = 10
+agent1.sources.tcp-source.max-monitor-cnt=500000
# http-source
agent1.sources.http-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
@@ -80,6 +81,7 @@ agent1.sources.http-source.metric-recovery-path =
./data/file/recovery
agent1.sources.http-source.metric-agent-port=8003
agent1.sources.http-source.metric-cache-size=1000000
agent1.sources.http-source.set=10
+agent1.sources.http-source.max-monitor-cnt=500000
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 50000
@@ -118,20 +120,25 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.mq-sink-msg1.channel = ch-msg1
agent1.sinks.mq-sink-msg1.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.mq-sink-msg1.maxThreads = 1
+agent1.sinks.mq-sink-msg1.max-monitor-cnt=500000
agent1.sinks.mq-sink-msg2.channel = ch-msg2
agent1.sinks.mq-sink-msg2.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.mq-sink-msg2.maxThreads = 1
+agent1.sinks.mq-sink-msg2.max-monitor-cnt=500000
# For order message
agent1.sinks.mq-sink-msg3.channel = ch-msg3
agent1.sinks.mq-sink-msg3.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.mq-sink-msg3.maxThreads = 1
+agent1.sinks.mq-sink-msg3.max-monitor-cnt=500000
agent1.sinks.mq-sink-msg5.channel = ch-msg5
agent1.sinks.mq-sink-msg5.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.mq-sink-msg5.maxThreads = 1
+agent1.sinks.mq-sink-msg5.max-monitor-cnt=500000
agent1.sinks.mq-sink-msg6.channel = ch-msg6
agent1.sinks.mq-sink-msg6.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
agent1.sinks.mq-sink-msg6.maxThreads = 1
+agent1.sinks.mq-sink-msg6.max-monitor-cnt=500000
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index fea9e0238..f736313e3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -61,6 +61,7 @@ public class ConfigConstants {
public static final String STAT_INTERVAL_SEC = "stat-interval-sec";
public static final String MAX_MONITOR_CNT = "max-monitor-cnt";
+ public static final int DEF_MONITOR_STAT_CNT = 300000;
public static final String HEART_INTERVAL_SEC = "heart-interval-sec";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 973bc128c..7faa75d64 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -17,7 +17,6 @@
package org.apache.inlong.dataproxy.http;
-import static
org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
import com.google.common.base.Preconditions;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
@@ -54,7 +53,7 @@ public class HttpBaseSource extends AbstractSource implements
EventDrivenSource,
protected MonitorIndex monitorIndex = null;
protected MonitorIndexExt monitorIndexExt = null;
private int statIntervalSec = 60;
- private int maxMonitorCnt = 300000;
+ private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
// audit
protected DataProxyMetricItemSet metricItemSet;
@@ -137,8 +136,14 @@ public class HttpBaseSource extends AbstractSource
implements EventDrivenSource,
// get statistic interval
statIntervalSec =
context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, 60);
Preconditions.checkArgument((statIntervalSec >= 0), "statIntervalSec
must be >= 0");
- // get max monitor record count
- maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000);
+ // get maxMonitorCnt's configure value
+ try {
+ maxMonitorCnt = context.getInteger(
+ ConfigConstants.MAX_MONITOR_CNT,
ConfigConstants.DEF_MONITOR_STAT_CNT);
+ } catch (NumberFormatException e) {
+ logger.warn("Property {} must specify an integer value: {}",
+ ConfigConstants.MAX_MONITOR_CNT,
context.getString(ConfigConstants.MAX_MONITOR_CNT));
+ }
Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be
>= 0");
customProcessor =
context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 9891987a2..25cf65417 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -18,7 +18,6 @@
package org.apache.inlong.dataproxy.sink;
import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG;
-import static
org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
@@ -149,7 +148,7 @@ public class PulsarSink extends AbstractSink implements
Configurable, SendMessag
private RateLimiter diskRateLimiter;
private long t1 = System.currentTimeMillis();
- private int maxMonitorCnt = 300000;
+ private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
/*
* Control whether the SinkRunner thread can read data from the Channel
*/
@@ -193,8 +192,15 @@ public class PulsarSink extends AbstractSink implements
Configurable, SendMessag
@Override
public void configure(Context context) {
logger.info("PulsarSink started and context = {}", context.toString());
- maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000);
-
+ // get maxMonitorCnt's configure value
+ try {
+ maxMonitorCnt = context.getInteger(
+ ConfigConstants.MAX_MONITOR_CNT,
ConfigConstants.DEF_MONITOR_STAT_CNT);
+ } catch (NumberFormatException e) {
+ logger.warn("Property {} must specify an integer value: {}",
+ ConfigConstants.MAX_MONITOR_CNT,
context.getString(ConfigConstants.MAX_MONITOR_CNT));
+ }
+ Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be
>= 0");
configManager = ConfigManager.getInstance();
topicProperties = configManager.getTopicProperties();
pulsarCluster = configManager.getMqClusterUrl2Token();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index fdd15c4c0..88da9d079 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -18,7 +18,6 @@
package org.apache.inlong.dataproxy.sink;
import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG;
-import static
org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
import com.google.common.base.Preconditions;
import java.util.HashSet;
@@ -84,7 +83,7 @@ public class TubeSink extends AbstractSink implements
Configurable {
private String usedMasterAddr = null;
private Set<String> masterHostAndPortLists;
// statistic info log
- private int maxMonitorCnt = 300000;
+ private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
private int statIntervalSec = 60;
private MonitorIndex monitorIndex;
private MonitorIndexExt monitorIndexExt;
@@ -137,8 +136,15 @@ public class TubeSink extends AbstractSink implements
Configurable {
// start message deduplication handler
MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
tubeConfig.getMaxSurvivedTime(),
tubeConfig.getMaxSurvivedSize());
- // get statistic configure items
- maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000);
+ // get maxMonitorCnt's configure value
+ try {
+ maxMonitorCnt = context.getInteger(
+ ConfigConstants.MAX_MONITOR_CNT,
ConfigConstants.DEF_MONITOR_STAT_CNT);
+ } catch (NumberFormatException e) {
+ logger.warn("Property {} must specify an integer value: {}",
+ ConfigConstants.MAX_MONITOR_CNT,
context.getString(ConfigConstants.MAX_MONITOR_CNT));
+ }
+ Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be
>= 0");
statIntervalSec = tubeConfig.getStatIntervalSec();
Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec
must be >= 0");
// initial TubeMQ configure
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index af17bc6ff..64612e79e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -118,7 +118,7 @@ public abstract class BaseSource
private static String HOST_DEFAULT_VALUE = "0.0.0.0";
- private static int maxMonitorCnt = 300000;
+ private static int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT;
private static int DEFAULT_MAX_CONNECTIONS = 5000;
@@ -285,6 +285,15 @@ public abstract class BaseSource
logger.warn("Simple TCP Source max-threads property must specify
an integer value. {}",
context.getString(ConfigConstants.MAX_THREADS));
}
+ // get maxMonitorCnt's configure value
+ try {
+ maxMonitorCnt = context.getInteger(
+ ConfigConstants.MAX_MONITOR_CNT,
ConfigConstants.DEF_MONITOR_STAT_CNT);
+ } catch (NumberFormatException e) {
+ logger.warn("Property {} must specify an integer value: {}",
+ ConfigConstants.MAX_MONITOR_CNT,
context.getString(ConfigConstants.MAX_MONITOR_CNT));
+ }
+ Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be
>= 0");
receiveBufferSize =
context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE,
RECEIVE_BUFFER_DEFAULT_SIZE);
if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {