This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40fc15b23bc Pipe: fix connector subtasks can not be stopped and
restarted after exception reporting by connector subtasks (#11979)
40fc15b23bc is described below
commit 40fc15b23bcc6fca83412f1d9796763f03ee7eca
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jan 26 18:30:25 2024 +0800
Pipe: fix connector subtasks can not be stopped and restarted after
exception reporting by connector subtasks (#11979)
---
.../subtask/connector/PipeConnectorSubtask.java | 10 ++++--
.../connector/PipeConnectorSubtaskLifeCycle.java | 42 +++++++++++-----------
2 files changed, 28 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 967ca2d6b86..f472a07fa5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -207,14 +207,18 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
+ // We should reconstruct the client before re-submit the subtask
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
return;
}
}
- // Handle other exceptions as usual
- super.onFailure(throwable);
+ // Handle exceptions if any available clients exist
+ // Notice that the PipeRuntimeConnectorCriticalException must be thrown
here
+ // because the upper layer relies on this to stop all the related pipe
tasks
+ // Other exceptions may cause the subtask to stop forever and can not be
restarted
+ super.onFailure(new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
}
/** @return true if the pipe task should be stopped, false otherwise */
@@ -252,7 +256,7 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
}
}
- // Stop current pipe task if failed to reconnect to
+ // Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 6759dbdbaa2..6600fae8bc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -35,7 +35,7 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
private final BoundedBlockingPendingQueue<Event> pendingQueue;
private int runningTaskCount;
- private int aliveTaskCount;
+ private int registeredTaskCount;
public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
@@ -46,7 +46,7 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
this.pendingQueue = pendingQueue;
runningTaskCount = 0;
- aliveTaskCount = 0;
+ registeredTaskCount = 0;
}
public PipeConnectorSubtask getSubtask() {
@@ -58,44 +58,44 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
}
public synchronized void register() {
- if (aliveTaskCount < 0) {
- throw new IllegalStateException("aliveTaskCount < 0");
+ if (registeredTaskCount < 0) {
+ throw new IllegalStateException("registeredTaskCount < 0");
}
- if (aliveTaskCount == 0) {
+ if (registeredTaskCount == 0) {
executor.register(subtask);
runningTaskCount = 0;
}
- aliveTaskCount++;
+ registeredTaskCount++;
LOGGER.info(
- "Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
/**
* Deregister the subtask. If the subtask is the last one, close the subtask.
*
* <p>Note that this method should be called after the subtask is stopped.
Otherwise, the
- * runningTaskCount might be inconsistent with the aliveTaskCount because of
parallel connector
- * scheduling.
+ * runningTaskCount might be inconsistent with the registeredTaskCount
because of parallel
+ * connector scheduling.
*
* @param pipeNameToDeregister pipe name
* @return true if the subtask is out of life cycle, indicating that the
subtask should never be
* used again
- * @throws IllegalStateException if aliveTaskCount <= 0
+ * @throws IllegalStateException if registeredTaskCount <= 0
*/
public synchronized boolean deregister(String pipeNameToDeregister) {
- if (aliveTaskCount <= 0) {
- throw new IllegalStateException("aliveTaskCount <= 0");
+ if (registeredTaskCount <= 0) {
+ throw new IllegalStateException("registeredTaskCount <= 0");
}
subtask.discardEventsOfPipe(pipeNameToDeregister);
try {
- if (aliveTaskCount > 1) {
+ if (registeredTaskCount > 1) {
return false;
}
@@ -103,12 +103,12 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
// This subtask is out of life cycle, should never be used again
return true;
} finally {
- aliveTaskCount--;
+ registeredTaskCount--;
LOGGER.info(
- "Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount:
{}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
}
@@ -123,10 +123,10 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
runningTaskCount++;
LOGGER.info(
- "Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
public synchronized void stop() {
@@ -140,10 +140,10 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
runningTaskCount--;
LOGGER.info(
- "Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
@Override