This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c3c06029b65 Pipe: Fixed the bug that the
PipeDataNodeRemainingEventAndTimeMetrics may generate NullPointerException
(#14015) (#14024)
c3c06029b65 is described below
commit c3c06029b6505377a16a69289294eee56256e60a
Author: Caideyipi <[email protected]>
AuthorDate: Fri Nov 8 15:36:21 2024 +0800
Pipe: Fixed the bug that the PipeDataNodeRemainingEventAndTimeMetrics may
generate NullPointerException (#14015) (#14024)
---
.../metric/PipeConfigNodeRemainingTimeMetrics.java | 9 ++++-
.../PipeConfigNodeRemainingTimeOperator.java | 5 ++-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 4 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +-
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 45 +++++++++++++++-------
.../PipeDataNodeRemainingEventAndTimeOperator.java | 5 ++-
.../commons/pipe/metric/PipeRemainingOperator.java | 12 +++---
9 files changed, 59 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index ab5440eab2d..23c43461c78 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -103,7 +103,11 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
remainingTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeConfigNodeRemainingTimeOperator())
+ .computeIfAbsent(
+ pipeID,
+ k ->
+ new PipeConfigNodeRemainingTimeOperator(
+ extractor.getPipeName(), extractor.getCreationTime()))
.register(extractor);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
@@ -157,7 +161,8 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
public double getRemainingTime(final String pipeName, final long
creationTime) {
return remainingTimeOperatorMap
.computeIfAbsent(
- pipeName + "_" + creationTime, k -> new
PipeConfigNodeRemainingTimeOperator())
+ pipeName + "_" + creationTime,
+ k -> new PipeConfigNodeRemainingTimeOperator(pipeName,
creationTime))
.getRemainingTime();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
index e53a4bb1cb2..298bc521b74 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
@@ -43,6 +43,10 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
private double lastConfigRegionCommitSmoothingValue = Long.MAX_VALUE;
+ PipeConfigNodeRemainingTimeOperator(String pipeName, long creationTime) {
+ super(pipeName, creationTime);
+ }
+
//////////////////////////// Remaining time calculation
////////////////////////////
/**
@@ -91,7 +95,6 @@ class PipeConfigNodeRemainingTimeOperator extends
PipeRemainingOperator {
//////////////////////////// Register & deregister (pipe integration)
////////////////////////////
void register(final IoTDBConfigRegionExtractor extractor) {
- setNameAndCreationTime(extractor.getPipeName(),
extractor.getCreationTime());
configRegionExtractors.add(extractor);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index e64c3e12cd6..29f770686ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -84,7 +84,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseHeartbeatEventCount(pipeName + "_" + creationTime);
+ .increaseHeartbeatEventCount(pipeName, creationTime);
}
return true;
}
@@ -95,7 +95,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
// not the event copied and passed to the extractor
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseHeartbeatEventCount(pipeName + "_" + creationTime);
+ .decreaseHeartbeatEventCount(pipeName, creationTime);
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 5932c6b544b..4979a3d2777 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -136,7 +136,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseTabletEventCount(pipeName + "_" + creationTime);
+ .increaseTabletEventCount(pipeName, creationTime);
}
return true;
} catch (final Exception e) {
@@ -169,7 +169,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTabletEventCount(pipeName + "_" + creationTime);
+ .decreaseTabletEventCount(pipeName, creationTime);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index b89c11b1f39..24e3c7cd1d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -118,7 +118,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseTabletEventCount(pipeName + "_" + creationTime);
+ .increaseTabletEventCount(pipeName, creationTime);
}
return true;
}
@@ -127,7 +127,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTabletEventCount(pipeName + "_" + creationTime);
+ .decreaseTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 699dceea669..0a19cf2855c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -249,7 +249,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
}
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseTsFileEventCount(pipeName + "_" + creationTime);
+ .increaseTsFileEventCount(pipeName, creationTime);
}
return true;
} catch (final Exception e) {
@@ -280,7 +280,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTsFileEventCount(pipeName + "_" + creationTime);
+ .decreaseTsFileEventCount(pipeName, creationTime);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 25c2ede2407..85be65eb9f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -122,46 +122,62 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
// The metric is global thus the regionId is omitted
final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeID,
+ k ->
+ new PipeDataNodeRemainingEventAndTimeOperator(
+ extractor.getPipeName(), extractor.getCreationTime()))
.register(extractor);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
}
- public void increaseTabletEventCount(final String pipeID) {
+ public void increaseTabletEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
.increaseTabletEventCount();
}
- public void decreaseTabletEventCount(final String pipeID) {
+ public void decreaseTabletEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
.decreaseTabletEventCount();
}
- public void increaseTsFileEventCount(final String pipeID) {
+ public void increaseTsFileEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
.increaseTsFileEventCount();
}
- public void decreaseTsFileEventCount(final String pipeID) {
+ public void decreaseTsFileEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
.decreaseTsFileEventCount();
}
- public void increaseHeartbeatEventCount(final String pipeID) {
+ public void increaseHeartbeatEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
.increaseHeartbeatEventCount();
}
- public void decreaseHeartbeatEventCount(final String pipeID) {
+ public void decreaseHeartbeatEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
.decreaseHeartbeatEventCount();
}
@@ -237,7 +253,8 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
final String pipeName, final long creationTime) {
final PipeDataNodeRemainingEventAndTimeOperator operator =
remainingEventAndTimeOperatorMap.computeIfAbsent(
- pipeName + "_" + creationTime, k -> new
PipeDataNodeRemainingEventAndTimeOperator());
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
return new Pair<>(operator.getRemainingEvents(),
operator.getRemainingTime());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index bee0e6975b4..4194acc9a03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -56,6 +56,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
+ PipeDataNodeRemainingEventAndTimeOperator(final String pipeName, final long
creationTime) {
+ super(pipeName, creationTime);
+ }
+
//////////////////////////// Remaining event & time calculation
////////////////////////////
void increaseTabletEventCount() {
@@ -163,7 +167,6 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
//////////////////////////// Register & deregister (pipe integration)
////////////////////////////
void register(final IoTDBSchemaRegionExtractor extractor) {
- setNameAndCreationTime(extractor.getPipeName(),
extractor.getCreationTime());
schemaRegionExtractors.add(extractor);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
index 97f80e5d985..443b6e42d58 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
@@ -32,6 +32,11 @@ public abstract class PipeRemainingOperator {
private long lastNonEmptyTimeStamp = System.currentTimeMillis();
protected boolean isStopped = true;
+ protected PipeRemainingOperator(final String pipeName, final long
creationTime) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ }
+
//////////////////////////// Tags ////////////////////////////
public String getPipeName() {
@@ -42,13 +47,6 @@ public abstract class PipeRemainingOperator {
return creationTime;
}
- //////////////////////////// Register & deregister (pipe integration)
////////////////////////////
-
- protected void setNameAndCreationTime(final String pipeName, final long
creationTime) {
- this.pipeName = pipeName;
- this.creationTime = creationTime;
- }
-
//////////////////////////// Switch ////////////////////////////
protected void notifyNonEmpty() {