This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new ab460eb  support user-defined statsitem & upgrade rocketmq.version to 
4.7.1 (#246)
ab460eb is described below

commit ab460eb068bfafd903616aaa8c1374f4a8b97580
Author: lizhiboo <[email protected]>
AuthorDate: Mon Aug 15 16:51:10 2022 +0800

    support user-defined statsitem & upgrade rocketmq.version to 4.7.1 (#246)
---
 connectors/rocketmq-connect-debezium/pom.xml       |  2 +-
 connectors/rocketmq-connect-jdbc/pom.xml           |  2 +-
 connectors/rocketmq-replicator/pom.xml             |  2 +-
 pom.xml                                            |  2 +-
 rocketmq-connect-cli/pom.xml                       |  2 +-
 rocketmq-connect-runtime/pom.xml                   |  2 +-
 .../connect/runtime/stats/ConnectStatsManager.java | 25 ++++++++++++++++++++++
 7 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/pom.xml 
b/connectors/rocketmq-connect-debezium/pom.xml
index 5f0e2ec..4c7d6c5 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -180,7 +180,7 @@
         <debezium.version>1.7.2.Final</debezium.version>
         <debezium.postgresql.version>42.3.3</debezium.postgresql.version>
         <!--rocketmq version-->
-        <rocketmq.version>4.7.0</rocketmq.version>
+        <rocketmq.version>4.7.1</rocketmq.version>
         <rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
 
         <!--rocketmq connect version-->
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml 
b/connectors/rocketmq-connect-jdbc/pom.xml
index 097bb3a..a4face9 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -40,7 +40,7 @@
         <!-- Compiler settings properties -->
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
-        <rocketmq.version>4.7.0</rocketmq.version>
+        <rocketmq.version>4.7.1</rocketmq.version>
 
         <!--test jar-->
         <junit.version>4.13.1</junit.version>
diff --git a/connectors/rocketmq-replicator/pom.xml 
b/connectors/rocketmq-replicator/pom.xml
index 95a72f3..5146e46 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -67,7 +67,7 @@
     </build>
 
     <properties>
-        <rocketmq.version>4.7.0</rocketmq.version>
+        <rocketmq.version>4.7.1</rocketmq.version>
         
<openmessaging.connector.version>0.1.3</openmessaging.connector.version>
         <junit.version>4.13.1</junit.version>
         <mockito.version>3.2.4</mockito.version>
diff --git a/pom.xml b/pom.xml
index c4b4e16..7869245 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
         </license>
     </licenses>
     <properties>
-        <rocketmq.version>4.7.0</rocketmq.version>
+        <rocketmq.version>4.7.1</rocketmq.version>
         <junit.version>4.13.1</junit.version>
         <assertj.version>3.22.0</assertj.version>
         <mockito.version>3.2.4</mockito.version>
diff --git a/rocketmq-connect-cli/pom.xml b/rocketmq-connect-cli/pom.xml
index 8b7976c..a685adc 100644
--- a/rocketmq-connect-cli/pom.xml
+++ b/rocketmq-connect-cli/pom.xml
@@ -36,7 +36,7 @@
         <maven.compiler.target>1.8</maven.compiler.target>
 
         <!-- RocketMQ Version-->
-        <rocketmq.version>4.7.0</rocketmq.version>
+        <rocketmq.version>4.7.1</rocketmq.version>
     </properties>
 
     <build>
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index bd31d7c..b00929d 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -43,7 +43,7 @@
         <maven.compiler.target>1.8</maven.compiler.target>
 
         <!-- RocketMQ Version-->
-        <rocketmq.version>4.7.0</rocketmq.version>
+        <rocketmq.version>4.7.1</rocketmq.version>
 
     </properties>
 
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
index 72e10bf..c02b0d8 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.connect.runtime.stats;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -295,4 +296,28 @@ public class ConnectStatsManager {
     public void incSinkRecordReadTotalTimes() {
         this.statsTable.get(SINK_RECORD_READ_TOTAL_TIMES).addValue(worker, 1, 
1);
     }
+
+    public void initAdditionalItems(List<String> additionalItems) {
+        for (String additionalItem : additionalItems) {
+            if (this.statsTable.containsKey(additionalItem)) {
+                log.warn("Already exists statsItem : " + additionalItem + ", 
just skip");
+                continue;
+            }
+            this.statsTable.put(additionalItem, new 
StatsItemSet(additionalItem, scheduledExecutorService, log));
+        }
+    }
+
+    public void incAdditionalItem(String additionalItem, String key,  int 
incValue, int incTimes) {
+        StatsItemSet statsItemSet = this.statsTable.get(additionalItem);
+        if (statsItemSet != null) {
+            statsItemSet.addValue(key, incValue, incTimes);
+        }
+    }
+
+    public void removeAdditionalItem(String additionalItem, String key) {
+        StatsItemSet statsItemSet = this.statsTable.get(additionalItem);
+        if (statsItemSet != null) {
+            statsItemSet.delValue(key);
+        }
+    }
 }

Reply via email to