This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new a8940b691ab Pipe: Fix infinite loop when thread is interrupted in 
invoking PipeMemoryBlock#close & Avoid throwing new InterruptedException in 
conditions that can be self-restoring (#14471) (#14486)
a8940b691ab is described below

commit a8940b691ab4ed4b8f3a2e19898caf1a72ac803d
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Dec 18 18:57:55 2024 +0800

    Pipe: Fix infinite loop when thread is interrupted in invoking 
PipeMemoryBlock#close & Avoid throwing new InterruptedException in conditions 
that can be self-restoring (#14471) (#14486)
---
 .../event/common/tsfile/PipeTsFileInsertionEvent.java   | 17 ++++++++++++-----
 .../iotdb/db/pipe/resource/memory/PipeMemoryBlock.java  | 14 +++++++++++++-
 .../event/response/SubscriptionEventTsFileResponse.java |  2 +-
 3 files changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d29afdbee83..71a21eb5f05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -414,13 +414,20 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent
       }
       waitForResourceEnough4Parsing(timeoutMs);
       return initDataContainer().toTabletInsertionEvents();
-    } catch (final InterruptedException e) {
-      Thread.currentThread().interrupt();
+    } catch (final Exception e) {
       close();
 
+      // close() should be called before re-interrupting the thread
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+
       final String errorMsg =
-          String.format(
-              "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
+          e instanceof InterruptedException
+              ? String.format(
+                  "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath())
+              : String.format(
+                  "Parse TsFile %s error. Because: %s", 
resource.getTsFilePath(), e.getMessage());
       LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg);
     }
@@ -458,7 +465,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
       if (waitTimeSeconds * 1000 > timeoutMs) {
         // should contain 'TimeoutException' in exception message
-        throw new InterruptedException(
+        throw new PipeException(
             String.format("TimeoutException: Waited %s seconds", 
waitTimeSeconds));
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 8ebe90b388c..07f5b904523 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -170,20 +170,32 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   @Override
   public void close() {
+    boolean isInterrupted = false;
+
     while (true) {
       try {
         if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
           try {
             pipeMemoryManager.release(this);
+            if (isInterrupted) {
+              LOGGER.warn("{} is released after thread interruption.", this);
+            }
             break;
           } finally {
             lock.unlock();
           }
         }
       } catch (final InterruptedException e) {
-        Thread.currentThread().interrupt();
+        // Each time the close task is run, it means that the interrupt status 
left by the previous
+        // tryLock does not need to be retained. Otherwise, it will lead to an 
infinite loop.
+        isInterrupted = true;
         LOGGER.warn("Interrupted while waiting for the lock.", e);
       }
     }
+
+    // Restore the interrupt status of the current thread
+    if (isInterrupted) {
+      Thread.currentThread().interrupt();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index 6c102c21912..2397ec51c60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -234,7 +234,7 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
       if (waitTimeSeconds * 1000 > timeoutMs) {
         // should contain 'TimeoutException' in exception message
         // see 
org.apache.iotdb.rpc.subscription.exception.SubscriptionTimeoutException.KEYWORD
-        throw new InterruptedException(
+        throw new SubscriptionException(
             String.format("TimeoutException: Waited %s seconds", 
waitTimeSeconds));
       }
     }

Reply via email to