This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1c3ab3900c7 Pipe: fix drop pipe stuck (make connector subtask close
faster by early returning) (#11222)
1c3ab3900c7 is described below
commit 1c3ab3900c77e53ec4b3ca0ad06651c84e936b47
Author: 马子坤 <[email protected]>
AuthorDate: Sat Oct 7 09:57:06 2023 +0800
Pipe: fix drop pipe stuck (make connector subtask close faster by early
returning) (#11222)
If we drop a pipe when it is transferring a large TsFile, the "drop pipe"
statement will stuck for a while, until the TsFile is fully transferred or the
query times out.
This commit will make connector subtask close faster by early returning.
---
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 10 +++++-
.../pipe/task/connection/PipeEventCollector.java | 14 ++++++--
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 19 ++++++++--
.../subtask/connector/PipeConnectorSubtask.java | 42 +++++++++++++++-------
.../subtask/processor/PipeProcessorSubtask.java | 32 +++++++++--------
5 files changed, 84 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 0d304b4f1a8..9817331b62b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.pipe.api.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -35,6 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public abstract class EnrichedEvent implements Event {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EnrichedEvent.class);
+
private final AtomicInteger referenceCount;
protected final PipeTaskMeta pipeTaskMeta;
@@ -94,7 +99,10 @@ public abstract class EnrichedEvent implements Event {
reportProgress();
}
}
- referenceCount.decrementAndGet();
+ final int newReferenceCount = referenceCount.decrementAndGet();
+ if (newReferenceCount < 0) {
+ LOGGER.warn("reference count is decreased to {}.", newReferenceCount);
+ }
}
return isSuccessful;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c4368c7eda4..428a3b38d15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
public class PipeEventCollector implements EventCollector, AutoCloseable {
@@ -32,6 +33,8 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
private final EnrichedDeque<Event> bufferQueue;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
this.pendingQueue = pendingQueue;
bufferQueue = new EnrichedDeque<>(new LinkedList<>());
@@ -47,7 +50,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}
- while (!bufferQueue.isEmpty()) {
+ while (!isClosed.get() && !bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
// Try to put already buffered events into pending queue, if pending
queue is full, wait for
// pending queue to be available with timeout.
@@ -76,7 +79,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
* @return true if there are still buffered events after this operation,
false otherwise.
*/
public synchronized boolean tryCollectBufferedEvents() {
- while (!bufferQueue.isEmpty()) {
+ while (!isClosed.get() && !bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
if (pendingQueue.waitedOffer(bufferedEvent)) {
bufferQueue.poll();
@@ -87,7 +90,12 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
return false;
}
- public synchronized void close() {
+ public void close() {
+ isClosed.set(true);
+ doClose();
+ }
+
+ private synchronized void doClose() {
bufferQueue.forEach(
event -> {
if (event instanceof EnrichedEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 15d81348bb3..5a01d4afe7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -51,6 +51,7 @@ public abstract class PipeSubtask
// For controlling the subtask execution
protected final AtomicBoolean shouldStopSubmittingSelf = new
AtomicBoolean(true);
+ protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected PipeSubtaskScheduler subtaskScheduler;
// For fail-over
@@ -90,6 +91,11 @@ public abstract class PipeSubtask
return hasAtLeastOneEventProcessed;
}
+ /** Should be synchronized with {@link PipeSubtask#releaseLastEvent} */
+ protected synchronized void setLastEvent(Event event) {
+ lastEvent = event;
+ }
+
/**
* Try to consume an event by the pipe plugin.
*
@@ -107,6 +113,12 @@ public abstract class PipeSubtask
@Override
public void onFailure(@NotNull Throwable throwable) {
+ if (isClosed.get()) {
+ LOGGER.info("onFailure in pipe subtask, ignored because pipe is
dropped.");
+ releaseLastEvent(false);
+ return;
+ }
+
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {}({}), because of {}. Will retry for {}
times.",
@@ -194,12 +206,15 @@ public abstract class PipeSubtask
return !shouldStopSubmittingSelf.get();
}
+ // synchronized for close() and releaseLastEvent(). make sure that the
lastEvent
+ // will not be updated after pipeProcessor.close() to avoid resource leak
+ // because of the lastEvent is not released.
@Override
- public synchronized void close() {
+ public void close() {
releaseLastEvent(false);
}
- protected void releaseLastEvent(boolean shouldReport) {
+ protected synchronized void releaseLastEvent(boolean shouldReport) {
if (lastEvent != null) {
if (lastEvent instanceof EnrichedEvent) {
((EnrichedEvent)
lastEvent).decreaseReferenceCount(this.getClass().getName(), shouldReport);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index d6732e590d1..ffa339671d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -89,10 +89,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
@Override
- protected synchronized boolean executeOnce() {
+ protected boolean executeOnce() {
+ if (isClosed.get()) {
+ return false;
+ }
+
final Event event = lastEvent != null ? lastEvent :
inputPendingQueue.waitedPoll();
// Record this event for retrying on connection failure or other exceptions
- lastEvent = event;
+ setLastEvent(event);
if (event == null) {
return false;
}
@@ -118,13 +122,23 @@ public class PipeConnectorSubtask extends PipeSubtask {
releaseLastEvent(true);
} catch (PipeConnectionException e) {
- throw e;
+ if (!isClosed.get()) {
+ throw e;
+ } else {
+ LOGGER.info("PipeConnectionException in pipe transfer, ignored because
pipe is dropped.");
+ releaseLastEvent(false);
+ }
} catch (Exception e) {
- throw new PipeException(
- "Error occurred during executing PipeConnector#transfer, perhaps
need to check "
- + "whether the implementation of PipeConnector is correct "
- + "according to the pipe-api description.",
- e);
+ if (!isClosed.get()) {
+ throw new PipeException(
+ "Error occurred during executing PipeConnector#transfer, perhaps
need to check "
+ + "whether the implementation of PipeConnector is correct "
+ + "according to the pipe-api description.",
+ e);
+ } else {
+ LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.");
+ releaseLastEvent(false);
+ }
}
return true;
@@ -132,6 +146,12 @@ public class PipeConnectorSubtask extends PipeSubtask {
@Override
public void onFailure(@NotNull Throwable throwable) {
+ if (isClosed.get()) {
+ LOGGER.info("onFailure in pipe transfer, ignored because pipe is
dropped.");
+ releaseLastEvent(false);
+ return;
+ }
+
// Retry to connect to the target system if the connection is broken
if (throwable instanceof PipeConnectionException) {
LOGGER.warn(
@@ -232,10 +252,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
@Override
- // Synchronized for outputPipeConnector.close() and releaseLastEvent() in
super.close()
- // make sure that the lastEvent will not be updated after
pipeProcessor.close() to avoid
- // resource leak because of the lastEvent is not released.
- public synchronized void close() {
+ public void close() {
+ isClosed.set(true);
try {
outputPipeConnector.close();
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8f6d12d1b06..2814a40b5ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class PipeProcessorSubtask extends PipeSubtask {
@@ -49,8 +48,6 @@ public class PipeProcessorSubtask extends PipeSubtask {
private final PipeProcessor pipeProcessor;
private final PipeEventCollector outputEventCollector;
- private final AtomicBoolean isClosed;
-
public PipeProcessorSubtask(
String taskID,
EventSupplier inputEventSupplier,
@@ -60,7 +57,6 @@ public class PipeProcessorSubtask extends PipeSubtask {
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
this.outputEventCollector = outputEventCollector;
- isClosed = new AtomicBoolean(false);
}
@Override
@@ -84,10 +80,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
}
@Override
- protected synchronized boolean executeOnce() throws Exception {
+ protected boolean executeOnce() throws Exception {
+ if (isClosed.get()) {
+ return false;
+ }
+
final Event event = lastEvent != null ? lastEvent :
inputEventSupplier.supply();
// Record the last event for retry when exception occurs
- lastEvent = event;
+ setLastEvent(event);
if (event == null) {
// Though there is no event to process, there may still be some buffered
events
// in the outputEventCollector. Return true if there are still buffered
events,
@@ -112,11 +112,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
releaseLastEvent(true);
} catch (Exception e) {
- throw new PipeException(
- "Error occurred during executing PipeProcessor#process, perhaps need
to check "
- + "whether the implementation of PipeProcessor is correct "
- + "according to the pipe-api description.",
- e);
+ if (!isClosed.get()) {
+ throw new PipeException(
+ "Error occurred during executing PipeProcessor#process, perhaps
need to check "
+ + "whether the implementation of PipeProcessor is correct "
+ + "according to the pipe-api description.",
+ e);
+ } else {
+ LOGGER.info("Exception in pipe event processing, ignored because pipe
is dropped.");
+ releaseLastEvent(false);
+ }
}
return true;
@@ -130,10 +135,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
}
@Override
- // synchronized for pipeProcessor.close() and releaseLastEvent() in
super.close().
- // make sure that the lastEvent will not be updated after
pipeProcessor.close() to avoid
- // resource leak because of the lastEvent is not released.
- public synchronized void close() {
+ public void close() {
try {
isClosed.set(true);