This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c3ab3900c7 Pipe: fix drop pipe stuck (make connector subtask close 
faster by early returning) (#11222)
1c3ab3900c7 is described below

commit 1c3ab3900c77e53ec4b3ca0ad06651c84e936b47
Author: 马子坤 <[email protected]>
AuthorDate: Sat Oct 7 09:57:06 2023 +0800

    Pipe: fix drop pipe stuck (make connector subtask close faster by early 
returning) (#11222)
    
    If we drop a pipe when it is transferring a large TsFile, the "drop pipe" 
statement will stuck for a while, until the TsFile is fully transferred or the 
query times out.
    
    This commit will make connector subtask close faster by early returning.
---
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  | 10 +++++-
 .../pipe/task/connection/PipeEventCollector.java   | 14 ++++++--
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    | 19 ++++++++--
 .../subtask/connector/PipeConnectorSubtask.java    | 42 +++++++++++++++-------
 .../subtask/processor/PipeProcessorSubtask.java    | 32 +++++++++--------
 5 files changed, 84 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 0d304b4f1a8..9817331b62b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -35,6 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class EnrichedEvent implements Event {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(EnrichedEvent.class);
+
   private final AtomicInteger referenceCount;
 
   protected final PipeTaskMeta pipeTaskMeta;
@@ -94,7 +99,10 @@ public abstract class EnrichedEvent implements Event {
           reportProgress();
         }
       }
-      referenceCount.decrementAndGet();
+      final int newReferenceCount = referenceCount.decrementAndGet();
+      if (newReferenceCount < 0) {
+        LOGGER.warn("reference count is decreased to {}.", newReferenceCount);
+      }
     }
     return isSuccessful;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c4368c7eda4..428a3b38d15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class PipeEventCollector implements EventCollector, AutoCloseable {
 
@@ -32,6 +33,8 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
 
   private final EnrichedDeque<Event> bufferQueue;
 
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
   public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
     bufferQueue = new EnrichedDeque<>(new LinkedList<>());
@@ -47,7 +50,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
       ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
     }
 
-    while (!bufferQueue.isEmpty()) {
+    while (!isClosed.get() && !bufferQueue.isEmpty()) {
       final Event bufferedEvent = bufferQueue.peek();
       // Try to put already buffered events into pending queue, if pending 
queue is full, wait for
       // pending queue to be available with timeout.
@@ -76,7 +79,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
    * @return true if there are still buffered events after this operation, 
false otherwise.
    */
   public synchronized boolean tryCollectBufferedEvents() {
-    while (!bufferQueue.isEmpty()) {
+    while (!isClosed.get() && !bufferQueue.isEmpty()) {
       final Event bufferedEvent = bufferQueue.peek();
       if (pendingQueue.waitedOffer(bufferedEvent)) {
         bufferQueue.poll();
@@ -87,7 +90,12 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
     return false;
   }
 
-  public synchronized void close() {
+  public void close() {
+    isClosed.set(true);
+    doClose();
+  }
+
+  private synchronized void doClose() {
     bufferQueue.forEach(
         event -> {
           if (event instanceof EnrichedEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 15d81348bb3..5a01d4afe7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -51,6 +51,7 @@ public abstract class PipeSubtask
 
   // For controlling the subtask execution
   protected final AtomicBoolean shouldStopSubmittingSelf = new 
AtomicBoolean(true);
+  protected final AtomicBoolean isClosed = new AtomicBoolean(false);
   protected PipeSubtaskScheduler subtaskScheduler;
 
   // For fail-over
@@ -90,6 +91,11 @@ public abstract class PipeSubtask
     return hasAtLeastOneEventProcessed;
   }
 
+  /** Should be synchronized with {@link PipeSubtask#releaseLastEvent} */
+  protected synchronized void setLastEvent(Event event) {
+    lastEvent = event;
+  }
+
   /**
    * Try to consume an event by the pipe plugin.
    *
@@ -107,6 +113,12 @@ public abstract class PipeSubtask
 
   @Override
   public void onFailure(@NotNull Throwable throwable) {
+    if (isClosed.get()) {
+      LOGGER.info("onFailure in pipe subtask, ignored because pipe is 
dropped.");
+      releaseLastEvent(false);
+      return;
+    }
+
     if (retryCount.get() == 0) {
       LOGGER.warn(
           "Failed to execute subtask {}({}), because of {}. Will retry for {} 
times.",
@@ -194,12 +206,15 @@ public abstract class PipeSubtask
     return !shouldStopSubmittingSelf.get();
   }
 
+  // synchronized for close() and releaseLastEvent(). make sure that the 
lastEvent
+  // will not be updated after pipeProcessor.close() to avoid resource leak
+  // because of the lastEvent is not released.
   @Override
-  public synchronized void close() {
+  public void close() {
     releaseLastEvent(false);
   }
 
-  protected void releaseLastEvent(boolean shouldReport) {
+  protected synchronized void releaseLastEvent(boolean shouldReport) {
     if (lastEvent != null) {
       if (lastEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) 
lastEvent).decreaseReferenceCount(this.getClass().getName(), shouldReport);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index d6732e590d1..ffa339671d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -89,10 +89,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
   }
 
   @Override
-  protected synchronized boolean executeOnce() {
+  protected boolean executeOnce() {
+    if (isClosed.get()) {
+      return false;
+    }
+
     final Event event = lastEvent != null ? lastEvent : 
inputPendingQueue.waitedPoll();
     // Record this event for retrying on connection failure or other exceptions
-    lastEvent = event;
+    setLastEvent(event);
     if (event == null) {
       return false;
     }
@@ -118,13 +122,23 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
       releaseLastEvent(true);
     } catch (PipeConnectionException e) {
-      throw e;
+      if (!isClosed.get()) {
+        throw e;
+      } else {
+        LOGGER.info("PipeConnectionException in pipe transfer, ignored because 
pipe is dropped.");
+        releaseLastEvent(false);
+      }
     } catch (Exception e) {
-      throw new PipeException(
-          "Error occurred during executing PipeConnector#transfer, perhaps 
need to check "
-              + "whether the implementation of PipeConnector is correct "
-              + "according to the pipe-api description.",
-          e);
+      if (!isClosed.get()) {
+        throw new PipeException(
+            "Error occurred during executing PipeConnector#transfer, perhaps 
need to check "
+                + "whether the implementation of PipeConnector is correct "
+                + "according to the pipe-api description.",
+            e);
+      } else {
+        LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.");
+        releaseLastEvent(false);
+      }
     }
 
     return true;
@@ -132,6 +146,12 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   @Override
   public void onFailure(@NotNull Throwable throwable) {
+    if (isClosed.get()) {
+      LOGGER.info("onFailure in pipe transfer, ignored because pipe is 
dropped.");
+      releaseLastEvent(false);
+      return;
+    }
+
     // Retry to connect to the target system if the connection is broken
     if (throwable instanceof PipeConnectionException) {
       LOGGER.warn(
@@ -232,10 +252,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
   }
 
   @Override
-  // Synchronized for outputPipeConnector.close() and releaseLastEvent() in 
super.close()
-  // make sure that the lastEvent will not be updated after 
pipeProcessor.close() to avoid
-  // resource leak because of the lastEvent is not released.
-  public synchronized void close() {
+  public void close() {
+    isClosed.set(true);
     try {
       outputPipeConnector.close();
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8f6d12d1b06..2814a40b5ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeProcessorSubtask extends PipeSubtask {
@@ -49,8 +48,6 @@ public class PipeProcessorSubtask extends PipeSubtask {
   private final PipeProcessor pipeProcessor;
   private final PipeEventCollector outputEventCollector;
 
-  private final AtomicBoolean isClosed;
-
   public PipeProcessorSubtask(
       String taskID,
       EventSupplier inputEventSupplier,
@@ -60,7 +57,6 @@ public class PipeProcessorSubtask extends PipeSubtask {
     this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
-    isClosed = new AtomicBoolean(false);
   }
 
   @Override
@@ -84,10 +80,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
   }
 
   @Override
-  protected synchronized boolean executeOnce() throws Exception {
+  protected boolean executeOnce() throws Exception {
+    if (isClosed.get()) {
+      return false;
+    }
+
     final Event event = lastEvent != null ? lastEvent : 
inputEventSupplier.supply();
     // Record the last event for retry when exception occurs
-    lastEvent = event;
+    setLastEvent(event);
     if (event == null) {
       // Though there is no event to process, there may still be some buffered 
events
       // in the outputEventCollector. Return true if there are still buffered 
events,
@@ -112,11 +112,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
       releaseLastEvent(true);
     } catch (Exception e) {
-      throw new PipeException(
-          "Error occurred during executing PipeProcessor#process, perhaps need 
to check "
-              + "whether the implementation of PipeProcessor is correct "
-              + "according to the pipe-api description.",
-          e);
+      if (!isClosed.get()) {
+        throw new PipeException(
+            "Error occurred during executing PipeProcessor#process, perhaps 
need to check "
+                + "whether the implementation of PipeProcessor is correct "
+                + "according to the pipe-api description.",
+            e);
+      } else {
+        LOGGER.info("Exception in pipe event processing, ignored because pipe 
is dropped.");
+        releaseLastEvent(false);
+      }
     }
 
     return true;
@@ -130,10 +135,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
   }
 
   @Override
-  // synchronized for pipeProcessor.close() and releaseLastEvent() in 
super.close().
-  // make sure that the lastEvent will not be updated after 
pipeProcessor.close() to avoid
-  // resource leak because of the lastEvent is not released.
-  public synchronized void close() {
+  public void close() {
     try {
       isClosed.set(true);
 

Reply via email to