This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bd22fe65461 Pipe: distinguish between old and new process subtasks
before and after stuck restart to avoid unexpected hashmap override and removal
(#12400)
bd22fe65461 is described below
commit bd22fe6546147a22864eac0893dd673269c52d27
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 24 14:46:36 2024 +0800
Pipe: distinguish between old and new process subtasks before and after
stuck restart to avoid unexpected hashmap override and removal (#12400)
We noticed that when the pipe is stuck and restarted, the hashcode of new
and old process subtasks is identical. This may lead to a situation where the
key and value are inconsistent in the maintenance of ConcurrentHashMap
`subtasks -> subtasks` by `PipeProcessorSubtaskWorker`, thereby causing
unexpectedly removed subtasks that are NOT in the close state by
`cleanupClosedSubtasksIfNecessary` in
`org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtaskWorker`.
---
.../subtask/processor/PipeProcessorSubtask.java | 21 +++++++++++++++++----
1 file changed, 17 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 021099cac91..45568782948 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -41,6 +41,7 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +60,10 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
private final String pipeName;
private final int dataRegionId;
+ // This variable is used to distinguish between old and new subtasks before
and after stuck
+ // restart.
+ private final long subtaskCreationTime;
+
public PipeProcessorSubtask(
String taskID,
long creationTime,
@@ -68,6 +73,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
PipeProcessor pipeProcessor,
PipeEventCollector outputEventCollector) {
super(taskID, creationTime);
+ this.subtaskCreationTime = System.currentTimeMillis();
this.pipeName = pipeName;
this.dataRegionId = dataRegionId;
this.inputEventSupplier = inputEventSupplier;
@@ -204,14 +210,21 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
}
@Override
- public boolean equals(Object that) {
- return that instanceof PipeProcessorSubtask
- && this.taskID.equals(((PipeProcessorSubtask) that).taskID);
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final PipeProcessorSubtask that = (PipeProcessorSubtask) obj;
+ return Objects.equals(this.taskID, that.taskID)
+ && Objects.equals(this.subtaskCreationTime, that.subtaskCreationTime);
}
@Override
public int hashCode() {
- return taskID.hashCode();
+ return Objects.hash(taskID, subtaskCreationTime);
}
//////////////////////////// APIs provided for metric framework
////////////////////////////