This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 01a6c7d  [fix]Fix npe exception caused by closing jmx (#31)
01a6c7d is described below

commit 01a6c7d96581ed1adb08d684b1fbb9e861a2bd7b
Author: wudongliang <[email protected]>
AuthorDate: Fri Jun 21 17:58:35 2024 +0800

    [fix]Fix npe exception caused by closing jmx (#31)
---
 .../connector/metrics/DorisConnectMonitor.java     | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
 
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
index e5d1bd4..c3b28a2 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
@@ -46,6 +46,7 @@ public class DorisConnectMonitor {
     private Histogram partitionBufferCountHistogram;
     private final AtomicLong buffMemoryUsage;
     private final int taskId;
+    private final boolean enableCustomJMX;
 
     public DorisConnectMonitor(
             final boolean enableCustomJMXConfig,
@@ -59,7 +60,8 @@ public class DorisConnectMonitor {
 
         this.buffMemoryUsage = new AtomicLong(0);
         this.taskId = taskId;
-        if (enableCustomJMXConfig) {
+        this.enableCustomJMX = enableCustomJMXConfig;
+        if (this.enableCustomJMX) {
             registerJMXMetrics(metricsJmxReporter);
             LOG.info("init DorisConnectMonitor, taskId={}", taskId);
         }
@@ -134,31 +136,45 @@ public class DorisConnectMonitor {
     }
 
     public void setCommittedOffset(long committedOffset) {
-        this.committedOffset.set(committedOffset);
+        if (enableCustomJMX) {
+            this.committedOffset.set(committedOffset);
+        }
     }
 
     public void addAndGetLoadCount() {
-        this.totalLoadCount.getAndIncrement();
+        if (enableCustomJMX) {
+            this.totalLoadCount.getAndIncrement();
+        }
     }
 
     public void addAndGetTotalNumberOfRecord(long totalNumberOfRecord) {
-        this.totalNumberOfRecord.addAndGet(totalNumberOfRecord);
+        if (enableCustomJMX) {
+            this.totalNumberOfRecord.addAndGet(totalNumberOfRecord);
+        }
     }
 
     public void addAndGetTotalSizeOfData(long totalSizeOfData) {
-        this.totalSizeOfData.addAndGet(totalSizeOfData);
+        if (enableCustomJMX) {
+            this.totalSizeOfData.addAndGet(totalSizeOfData);
+        }
     }
 
     public void addAndGetBuffMemoryUsage(long memoryUsage) {
-        this.buffMemoryUsage.addAndGet(memoryUsage);
+        if (enableCustomJMX) {
+            this.buffMemoryUsage.addAndGet(memoryUsage);
+        }
     }
 
     public void resetMemoryUsage() {
-        this.buffMemoryUsage.set(0L);
+        if (enableCustomJMX) {
+            this.buffMemoryUsage.set(0L);
+        }
     }
 
     public void updateBufferMetrics(long bufferSizeBytes, int numOfRecords) {
-        partitionBufferSizeBytesHistogram.update(bufferSizeBytes);
-        partitionBufferCountHistogram.update(numOfRecords);
+        if (enableCustomJMX) {
+            partitionBufferSizeBytesHistogram.update(bufferSizeBytes);
+            partitionBufferCountHistogram.update(numOfRecords);
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to