This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new c3c06029b65 Pipe: Fixed the bug that the 
PipeDataNodeRemainingEventAndTimeMetrics may generate NullPointerException 
(#14015) (#14024)
c3c06029b65 is described below

commit c3c06029b6505377a16a69289294eee56256e60a
Author: Caideyipi <[email protected]>
AuthorDate: Fri Nov 8 15:36:21 2024 +0800

    Pipe: Fixed the bug that the PipeDataNodeRemainingEventAndTimeMetrics may 
generate NullPointerException (#14015) (#14024)
---
 .../metric/PipeConfigNodeRemainingTimeMetrics.java |  9 ++++-
 .../PipeConfigNodeRemainingTimeOperator.java       |  5 ++-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  4 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  4 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  4 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  | 45 +++++++++++++++-------
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  5 ++-
 .../commons/pipe/metric/PipeRemainingOperator.java | 12 +++---
 9 files changed, 59 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index ab5440eab2d..23c43461c78 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -103,7 +103,11 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
     // The metric is global thus the regionId is omitted
     final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
     remainingTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeConfigNodeRemainingTimeOperator())
+        .computeIfAbsent(
+            pipeID,
+            k ->
+                new PipeConfigNodeRemainingTimeOperator(
+                    extractor.getPipeName(), extractor.getCreationTime()))
         .register(extractor);
     if (Objects.nonNull(metricService)) {
       createMetrics(pipeID);
@@ -157,7 +161,8 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
   public double getRemainingTime(final String pipeName, final long 
creationTime) {
     return remainingTimeOperatorMap
         .computeIfAbsent(
-            pipeName + "_" + creationTime, k -> new 
PipeConfigNodeRemainingTimeOperator())
+            pipeName + "_" + creationTime,
+            k -> new PipeConfigNodeRemainingTimeOperator(pipeName, 
creationTime))
         .getRemainingTime();
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
index e53a4bb1cb2..298bc521b74 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
@@ -43,6 +43,10 @@ class PipeConfigNodeRemainingTimeOperator extends 
PipeRemainingOperator {
 
   private double lastConfigRegionCommitSmoothingValue = Long.MAX_VALUE;
 
+  PipeConfigNodeRemainingTimeOperator(String pipeName, long creationTime) {
+    super(pipeName, creationTime);
+  }
+
   //////////////////////////// Remaining time calculation 
////////////////////////////
 
   /**
@@ -91,7 +95,6 @@ class PipeConfigNodeRemainingTimeOperator extends 
PipeRemainingOperator {
   //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
 
   void register(final IoTDBConfigRegionExtractor extractor) {
-    setNameAndCreationTime(extractor.getPipeName(), 
extractor.getCreationTime());
     configRegionExtractors.add(extractor);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index e64c3e12cd6..29f770686ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -84,7 +84,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .increaseHeartbeatEventCount(pipeName + "_" + creationTime);
+          .increaseHeartbeatEventCount(pipeName, creationTime);
     }
     return true;
   }
@@ -95,7 +95,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     // not the event copied and passed to the extractor
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .decreaseHeartbeatEventCount(pipeName + "_" + creationTime);
+          .decreaseHeartbeatEventCount(pipeName, creationTime);
       if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
         LOGGER.debug(this.toString());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 5932c6b544b..4979a3d2777 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -136,7 +136,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       PipeDataNodeResourceManager.wal().pin(walEntryHandler);
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .increaseTabletEventCount(pipeName + "_" + creationTime);
+            .increaseTabletEventCount(pipeName, creationTime);
       }
       return true;
     } catch (final Exception e) {
@@ -169,7 +169,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     } finally {
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .decreaseTabletEventCount(pipeName + "_" + creationTime);
+            .decreaseTabletEventCount(pipeName, creationTime);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index b89c11b1f39..24e3c7cd1d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -118,7 +118,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
                 PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .increaseTabletEventCount(pipeName + "_" + creationTime);
+          .increaseTabletEventCount(pipeName, creationTime);
     }
     return true;
   }
@@ -127,7 +127,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .decreaseTabletEventCount(pipeName + "_" + creationTime);
+          .decreaseTabletEventCount(pipeName, creationTime);
     }
     allocatedMemoryBlock.close();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 699dceea669..0a19cf2855c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -249,7 +249,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
       }
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .increaseTsFileEventCount(pipeName + "_" + creationTime);
+            .increaseTsFileEventCount(pipeName, creationTime);
       }
       return true;
     } catch (final Exception e) {
@@ -280,7 +280,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     } finally {
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .decreaseTsFileEventCount(pipeName + "_" + creationTime);
+            .decreaseTsFileEventCount(pipeName, creationTime);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 25c2ede2407..85be65eb9f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -122,46 +122,62 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     // The metric is global thus the regionId is omitted
     final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeID,
+            k ->
+                new PipeDataNodeRemainingEventAndTimeOperator(
+                    extractor.getPipeName(), extractor.getCreationTime()))
         .register(extractor);
     if (Objects.nonNull(metricService)) {
       createMetrics(pipeID);
     }
   }
 
-  public void increaseTabletEventCount(final String pipeID) {
+  public void increaseTabletEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
         .increaseTabletEventCount();
   }
 
-  public void decreaseTabletEventCount(final String pipeID) {
+  public void decreaseTabletEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
         .decreaseTabletEventCount();
   }
 
-  public void increaseTsFileEventCount(final String pipeID) {
+  public void increaseTsFileEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
         .increaseTsFileEventCount();
   }
 
-  public void decreaseTsFileEventCount(final String pipeID) {
+  public void decreaseTsFileEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
         .decreaseTsFileEventCount();
   }
 
-  public void increaseHeartbeatEventCount(final String pipeID) {
+  public void increaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
         .increaseHeartbeatEventCount();
   }
 
-  public void decreaseHeartbeatEventCount(final String pipeID) {
+  public void decreaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {
     remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
+        .computeIfAbsent(
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
         .decreaseHeartbeatEventCount();
   }
 
@@ -237,7 +253,8 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
       final String pipeName, final long creationTime) {
     final PipeDataNodeRemainingEventAndTimeOperator operator =
         remainingEventAndTimeOperatorMap.computeIfAbsent(
-            pipeName + "_" + creationTime, k -> new 
PipeDataNodeRemainingEventAndTimeOperator());
+            pipeName + "_" + creationTime,
+            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
     return new Pair<>(operator.getRemainingEvents(), 
operator.getRemainingTime());
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index bee0e6975b4..4194acc9a03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -56,6 +56,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
   private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
 
+  PipeDataNodeRemainingEventAndTimeOperator(final String pipeName, final long 
creationTime) {
+    super(pipeName, creationTime);
+  }
+
   //////////////////////////// Remaining event & time calculation 
////////////////////////////
 
   void increaseTabletEventCount() {
@@ -163,7 +167,6 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
 
   void register(final IoTDBSchemaRegionExtractor extractor) {
-    setNameAndCreationTime(extractor.getPipeName(), 
extractor.getCreationTime());
     schemaRegionExtractors.add(extractor);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
index 97f80e5d985..443b6e42d58 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
@@ -32,6 +32,11 @@ public abstract class PipeRemainingOperator {
   private long lastNonEmptyTimeStamp = System.currentTimeMillis();
   protected boolean isStopped = true;
 
+  protected PipeRemainingOperator(final String pipeName, final long 
creationTime) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+  }
+
   //////////////////////////// Tags ////////////////////////////
 
   public String getPipeName() {
@@ -42,13 +47,6 @@ public abstract class PipeRemainingOperator {
     return creationTime;
   }
 
-  //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
-
-  protected void setNameAndCreationTime(final String pipeName, final long 
creationTime) {
-    this.pipeName = pipeName;
-    this.creationTime = creationTime;
-  }
-
   //////////////////////////// Switch ////////////////////////////
 
   protected void notifyNonEmpty() {

Reply via email to