This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 01a6c7d [fix]Fix npe exception caused by closing jmx (#31)
01a6c7d is described below
commit 01a6c7d96581ed1adb08d684b1fbb9e861a2bd7b
Author: wudongliang <[email protected]>
AuthorDate: Fri Jun 21 17:58:35 2024 +0800
[fix]Fix npe exception caused by closing jmx (#31)
---
.../connector/metrics/DorisConnectMonitor.java | 34 ++++++++++++++++------
1 file changed, 25 insertions(+), 9 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
index e5d1bd4..c3b28a2 100644
---
a/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
+++
b/src/main/java/org/apache/doris/kafka/connector/metrics/DorisConnectMonitor.java
@@ -46,6 +46,7 @@ public class DorisConnectMonitor {
private Histogram partitionBufferCountHistogram;
private final AtomicLong buffMemoryUsage;
private final int taskId;
+ private final boolean enableCustomJMX;
public DorisConnectMonitor(
final boolean enableCustomJMXConfig,
@@ -59,7 +60,8 @@ public class DorisConnectMonitor {
this.buffMemoryUsage = new AtomicLong(0);
this.taskId = taskId;
- if (enableCustomJMXConfig) {
+ this.enableCustomJMX = enableCustomJMXConfig;
+ if (this.enableCustomJMX) {
registerJMXMetrics(metricsJmxReporter);
LOG.info("init DorisConnectMonitor, taskId={}", taskId);
}
@@ -134,31 +136,45 @@ public class DorisConnectMonitor {
}
public void setCommittedOffset(long committedOffset) {
- this.committedOffset.set(committedOffset);
+ if (enableCustomJMX) {
+ this.committedOffset.set(committedOffset);
+ }
}
public void addAndGetLoadCount() {
- this.totalLoadCount.getAndIncrement();
+ if (enableCustomJMX) {
+ this.totalLoadCount.getAndIncrement();
+ }
}
public void addAndGetTotalNumberOfRecord(long totalNumberOfRecord) {
- this.totalNumberOfRecord.addAndGet(totalNumberOfRecord);
+ if (enableCustomJMX) {
+ this.totalNumberOfRecord.addAndGet(totalNumberOfRecord);
+ }
}
public void addAndGetTotalSizeOfData(long totalSizeOfData) {
- this.totalSizeOfData.addAndGet(totalSizeOfData);
+ if (enableCustomJMX) {
+ this.totalSizeOfData.addAndGet(totalSizeOfData);
+ }
}
public void addAndGetBuffMemoryUsage(long memoryUsage) {
- this.buffMemoryUsage.addAndGet(memoryUsage);
+ if (enableCustomJMX) {
+ this.buffMemoryUsage.addAndGet(memoryUsage);
+ }
}
public void resetMemoryUsage() {
- this.buffMemoryUsage.set(0L);
+ if (enableCustomJMX) {
+ this.buffMemoryUsage.set(0L);
+ }
}
public void updateBufferMetrics(long bufferSizeBytes, int numOfRecords) {
- partitionBufferSizeBytesHistogram.update(bufferSizeBytes);
- partitionBufferCountHistogram.update(numOfRecords);
+ if (enableCustomJMX) {
+ partitionBufferSizeBytesHistogram.update(bufferSizeBytes);
+ partitionBufferCountHistogram.update(numOfRecords);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]