This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new bbe4cb7 [ISSUE #124]Some sink /source metric statistics are incorrect
(#126)
bbe4cb7 is described below
commit bbe4cb780f8171a87ec4f27ba4f9183ab09fccea
Author: zhangjidi2016 <[email protected]>
AuthorDate: Wed May 18 13:49:17 2022 +0800
[ISSUE #124]Some sink /source metric statistics are incorrect (#126)
* [ISSUE #124]Some sink /source metric statistics are incorrect
* add source poll times and sink read times statistical
Co-authored-by: zhangjidi <[email protected]>
---
.../runtime/connectorwrapper/WorkerSinkTask.java | 5 ++++-
.../runtime/connectorwrapper/WorkerSourceTask.java | 9 +++++---
.../connect/runtime/stats/ConnectStatsManager.java | 26 +++++++++++++++++-----
3 files changed, 30 insertions(+), 10 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index dd6b8bc..8f95747 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -233,6 +233,9 @@ public class WorkerSinkTask implements WorkerTask {
log.error(" sink task {},pull message MQClientException,
Error {} ", this, e.getMessage(), e);
connectStatsManager.incSinkRecordPutTotalFailNums();
connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+ } finally {
+ // record sink read times
+ connectStatsManager.incSinkRecordReadTotalTimes();
}
}
@@ -421,7 +424,7 @@ public class WorkerSinkTask implements WorkerTask {
if (null != pullResult &&
pullResult.getPullStatus().equals(PullStatus.FOUND)) {
this.incPullTPS(entry.getKey().getTopic(),
pullResult.getMsgFoundList().size());
messages = pullResult.getMsgFoundList();
- connectStatsManager.incSinkRecordReadTotalNums();
+
connectStatsManager.incSinkRecordReadTotalNums(messages.size());
connectStatsManager.incSinkRecordReadNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID),
messages.size());
long pullRT = System.currentTimeMillis() -
beginPullMsgTimestamp;
connectStatsManager.incSinkRecordReadTotalRT(pullRT);
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 955d770..4c7eee8 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -179,8 +179,8 @@ public class WorkerSourceTask implements WorkerTask {
try {
toSendRecord = poll();
if (null != toSendRecord && toSendRecord.size() > 0) {
- connectStatsManager.incSourceRecordPollTotalNums();
-
connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+
connectStatsManager.incSourceRecordPollTotalNums(toSendRecord.size());
+
connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID),
toSendRecord.size());
sendRecord();
}
} catch (RetriableException e) {
@@ -190,8 +190,11 @@ public class WorkerSourceTask implements WorkerTask {
} catch (Exception e) {
connectStatsManager.incSourceRecordPollTotalFailNums();
connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
- log.error("Source task RetriableException exception",
e);
+ log.error("Source task Exception exception", e);
state.set(WorkerTaskState.ERROR);
+ } finally {
+ // record source poll times
+ connectStatsManager.incSourceRecordPollTotalTimes();
}
}
AtomicLong atomicLong =
connectStatsService.singleSourceTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
index b7a03e9..72e10bf 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -68,6 +68,9 @@ public class ConnectStatsManager {
public static final String SINK_RECORD_PUT_FAIL_RT =
"SINK_RECORD_PUT_FAIL_RT";
public static final String SINK_RECORD_PUT_TOTAL_FAIL_RT =
"SINK_RECORD_PUT_TOTAL_FAIL_RT";
+ public static final String SOURCE_RECORD_POLL_TOTAL_TIMES =
"SOURCE_RECORD_POLL_TOTAL_TIMES";
+ public static final String SINK_RECORD_READ_TOTAL_TIMES =
"SINK_RECORD_READ_TOTAL_TIMES";
+
/**
* read disk follow stats
*/
@@ -119,6 +122,9 @@ public class ConnectStatsManager {
this.statsTable.put(SINK_RECORD_PUT_TOTAL_RT, new
StatsItemSet(SINK_RECORD_PUT_TOTAL_RT, this.scheduledExecutorService, log));
this.statsTable.put(SINK_RECORD_PUT_FAIL_RT, new
StatsItemSet(SINK_RECORD_PUT_FAIL_RT, this.scheduledExecutorService, log));
this.statsTable.put(SINK_RECORD_PUT_TOTAL_FAIL_RT, new
StatsItemSet(SINK_RECORD_PUT_TOTAL_FAIL_RT, this.scheduledExecutorService,
log));
+
+ this.statsTable.put(SOURCE_RECORD_POLL_TOTAL_TIMES, new
StatsItemSet(SOURCE_RECORD_POLL_TOTAL_TIMES, this.scheduledExecutorService,
log));
+ this.statsTable.put(SINK_RECORD_READ_TOTAL_TIMES, new
StatsItemSet(SINK_RECORD_READ_TOTAL_TIMES, this.scheduledExecutorService, log));
}
public void start() {
@@ -138,16 +144,16 @@ public class ConnectStatsManager {
return null;
}
- public void incSourceRecordPollTotalNums() {
- this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_NUMS).addValue(worker, 1,
1);
+ public void incSourceRecordPollTotalNums(int incValue) {
+ this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_NUMS).addValue(worker,
incValue, 1);
}
- public void incSourceRecordPollNums(String taskId) {
+ public void incSourceRecordPollNums(String taskId, int incValue) {
if (StringUtils.isBlank(taskId)) {
return;
}
- this.statsTable.get(SOURCE_RECORD_POLL_NUMS).addValue(taskId, 1, 1);
+ this.statsTable.get(SOURCE_RECORD_POLL_NUMS).addValue(taskId,
incValue, 1);
}
public void incSourceRecordPollTotalFailNums() {
@@ -206,8 +212,8 @@ public class ConnectStatsManager {
this.statsTable.get(SINK_RECORD_READ_FAIL_NUMS).addValue(taskId, 1, 1);
}
- public void incSinkRecordReadTotalNums() {
- this.statsTable.get(SINK_RECORD_READ_TOTAL_NUMS).addValue(worker, 1,
1);
+ public void incSinkRecordReadTotalNums(int incValue) {
+ this.statsTable.get(SINK_RECORD_READ_TOTAL_NUMS).addValue(worker,
incValue, 1);
}
public void incSinkRecordReadNums(String taskId) {
@@ -281,4 +287,12 @@ public class ConnectStatsManager {
}
this.statsTable.get(SINK_RECORD_PUT_RT).addValue(taskId, (int) rt, 1);
}
+
+ public void incSourceRecordPollTotalTimes() {
+ this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_TIMES).addValue(worker,
1, 1);
+ }
+
+ public void incSinkRecordReadTotalTimes() {
+ this.statsTable.get(SINK_RECORD_READ_TOTAL_TIMES).addValue(worker, 1,
1);
+ }
}