This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bd22fe65461 Pipe: distinguish between old and new process subtasks 
before and after stuck restart to avoid unexpected hashmap override and removal 
(#12400)
bd22fe65461 is described below

commit bd22fe6546147a22864eac0893dd673269c52d27
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 24 14:46:36 2024 +0800

    Pipe: distinguish between old and new process subtasks before and after 
stuck restart to avoid unexpected hashmap override and removal (#12400)
    
    We noticed that when the pipe is stuck and restarted, the hashcode of new 
and old process subtasks is identical. This may lead to a situation where the 
key and value are inconsistent in the maintenance of ConcurrentHashMap 
`subtasks -> subtasks` by `PipeProcessorSubtaskWorker`, thereby causing 
unexpectedly removed subtasks that are NOT in the close state by 
`cleanupClosedSubtasksIfNecessary` in 
`org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtaskWorker`.
---
 .../subtask/processor/PipeProcessorSubtask.java     | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 021099cac91..45568782948 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -41,6 +41,7 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -59,6 +60,10 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
   private final String pipeName;
   private final int dataRegionId;
 
+  // This variable is used to distinguish between old and new subtasks before 
and after stuck
+  // restart.
+  private final long subtaskCreationTime;
+
   public PipeProcessorSubtask(
       String taskID,
       long creationTime,
@@ -68,6 +73,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       PipeProcessor pipeProcessor,
       PipeEventCollector outputEventCollector) {
     super(taskID, creationTime);
+    this.subtaskCreationTime = System.currentTimeMillis();
     this.pipeName = pipeName;
     this.dataRegionId = dataRegionId;
     this.inputEventSupplier = inputEventSupplier;
@@ -204,14 +210,21 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
   }
 
   @Override
-  public boolean equals(Object that) {
-    return that instanceof PipeProcessorSubtask
-        && this.taskID.equals(((PipeProcessorSubtask) that).taskID);
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final PipeProcessorSubtask that = (PipeProcessorSubtask) obj;
+    return Objects.equals(this.taskID, that.taskID)
+        && Objects.equals(this.subtaskCreationTime, that.subtaskCreationTime);
   }
 
   @Override
   public int hashCode() {
-    return taskID.hashCode();
+    return Objects.hash(taskID, subtaskCreationTime);
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////

Reply via email to