This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 1.2-pipe-connot-restart in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d9ba12adb54a418ae3082c546b0e5feb0ab69caf Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Oct 11 16:10:42 2023 +0800 Pipe: running count of the connector does not return to zero after exceptions occur (resulting in the failure of the automatic restart mechanism) (#11279) (cherry picked from commit 8951eb8ce045604914c38e261b03de503fd9916c) --- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 76 ++++++++++++++++++++++ .../iotdb/db/pipe/task/subtask/PipeSubtask.java | 10 +++ .../PipeRuntimeConnectorCriticalException.java | 6 +- .../pipe/PipeRuntimeCriticalException.java | 5 -- .../pipe/PipeRuntimeNonCriticalException.java | 5 -- .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 26 ++++---- 6 files changed, 102 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index a0cc1350949..bd26b0a0fe4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.agent.task; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; @@ -36,10 +37,12 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder; import org.apache.iotdb.db.pipe.task.PipeTask; import org.apache.iotdb.db.pipe.task.PipeTaskBuilder; import org.apache.iotdb.db.pipe.task.PipeTaskManager; +import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -51,6 +54,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -404,6 +408,71 @@ public class PipeTaskAgent { } private void stopAllPipesWithCriticalExceptionInternal() { + // 1. track exception in all pipe tasks that share the same connector that have critical + // exceptions. + final int currentDataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + final Map<PipeParameters, PipeRuntimeConnectorCriticalException> + reusedConnectorParameters2ExceptionMap = new HashMap<>(); + + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> { + final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta(); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + + runtimeMeta + .getConsensusGroupId2TaskMetaMap() + .values() + .forEach( + pipeTaskMeta -> { + if (pipeTaskMeta.getLeaderDataNodeId() != currentDataNodeId) { + return; + } + + for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) { + if (e instanceof PipeRuntimeConnectorCriticalException) { + reusedConnectorParameters2ExceptionMap.putIfAbsent( + staticMeta.getConnectorParameters(), + (PipeRuntimeConnectorCriticalException) e); + } + } + }); + }); + pipeMetaKeeper + .getPipeMetaList() + .forEach( + pipeMeta -> { + final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta(); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + + runtimeMeta + .getConsensusGroupId2TaskMetaMap() + .values() + .forEach( + pipeTaskMeta -> { + if (pipeTaskMeta.getLeaderDataNodeId() == currentDataNodeId + && reusedConnectorParameters2ExceptionMap.containsKey( + staticMeta.getConnectorParameters()) + && !pipeTaskMeta.containsExceptionMessage( + reusedConnectorParameters2ExceptionMap.get( + staticMeta.getConnectorParameters()))) { + final PipeRuntimeConnectorCriticalException exception = + reusedConnectorParameters2ExceptionMap.get( + staticMeta.getConnectorParameters()); + pipeTaskMeta.trackExceptionMessage(exception); + LOGGER.warn( + "Pipe {} (creation time = {}) will be stopped because of critical exception " + + "(occurred time {}) in connector {}.", + staticMeta.getPipeName(), + DateTimeUtils.convertLongToDate(staticMeta.getCreationTime(), "ms"), + DateTimeUtils.convertLongToDate(exception.getTimeStamp(), "ms"), + staticMeta.getConnectorParameters()); + } + }); + }); + + // 2. stop all pipes that have critical exceptions. pipeMetaKeeper .getPipeMetaList() .forEach( @@ -420,6 +489,13 @@ public class PipeTaskAgent { for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) { if (e instanceof PipeRuntimeCriticalException) { stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime()); + LOGGER.warn( + "Pipe {} (creation time = {}) was stopped because of critical exception " + + "(occurred time {}).", + staticMeta.getPipeName(), + DateTimeUtils.convertLongToDate( + staticMeta.getCreationTime(), "ms"), + DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms")); return; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java index 5a01d4afe7c..d4e35c35200 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java @@ -137,6 +137,16 @@ public abstract class PipeSubtask this.getClass().getSimpleName(), retryCount.get(), MAX_RETRY_TIMES); + try { + Thread.sleep(1000L * retryCount.get()); + } catch (InterruptedException e) { + LOGGER.warn( + "Interrupted when retrying to execute subtask {}({})", + taskID, + this.getClass().getSimpleName()); + Thread.currentThread().interrupt(); + } + submitSelf(); } else { final String errorMessage = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java index e5a8d71170c..1325ca55bda 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.Objects; public class PipeRuntimeConnectorCriticalException extends PipeRuntimeCriticalException { + public PipeRuntimeConnectorCriticalException(String message) { super(message); } @@ -44,11 +45,6 @@ public class PipeRuntimeConnectorCriticalException extends PipeRuntimeCriticalEx && Objects.equals(getTimeStamp(), ((PipeRuntimeException) obj).getTimeStamp()); } - @Override - public int hashCode() { - return Objects.hash(getMessage(), getTimeStamp()); - } - @Override public void serialize(ByteBuffer byteBuffer) { PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(byteBuffer); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java index 9f9076be32d..0d3c0c2a786 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java @@ -45,11 +45,6 @@ public class PipeRuntimeCriticalException extends PipeRuntimeException { && Objects.equals(getTimeStamp(), ((PipeRuntimeException) obj).getTimeStamp()); } - @Override - public int hashCode() { - return Objects.hash(getMessage(), getTimeStamp()); - } - @Override public void serialize(ByteBuffer byteBuffer) { PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(byteBuffer); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java index 64c7a66b2d8..6b072a4b6ab 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java @@ -45,11 +45,6 @@ public class PipeRuntimeNonCriticalException extends PipeRuntimeException { && Objects.equals(getTimeStamp(), ((PipeRuntimeException) obj).getTimeStamp()); } - @Override - public int hashCode() { - return Objects.hash(getMessage(), getTimeStamp()); - } - @Override public void serialize(ByteBuffer byteBuffer) { PipeRuntimeExceptionType.NON_CRITICAL_EXCEPTION.serialize(byteBuffer); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java index d24ed912bf6..e5a80f681be 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java @@ -34,10 +34,9 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Map; import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -55,7 +54,8 @@ public class PipeTaskMeta { * <p>The failure of them, respectively, will lead to the stop of the pipe, the stop of the pipes * sharing the same connector, and nothing. */ - private final Queue<PipeRuntimeException> exceptionMessages = new ConcurrentLinkedQueue<>(); + private final Map<PipeRuntimeException, PipeRuntimeException> exceptionMessages = + new ConcurrentHashMap<>(); public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int leaderDataNodeId) { this.progressIndex.set(progressIndex); @@ -80,11 +80,15 @@ public class PipeTaskMeta { } public synchronized Iterable<PipeRuntimeException> getExceptionMessages() { - return new ArrayList<>(exceptionMessages); + return new ArrayList<>(exceptionMessages.values()); } public synchronized void trackExceptionMessage(PipeRuntimeException exceptionMessage) { - exceptionMessages.add(exceptionMessage); + exceptionMessages.put(exceptionMessage, exceptionMessage); + } + + public synchronized boolean containsExceptionMessage(PipeRuntimeException exceptionMessage) { + return exceptionMessages.containsKey(exceptionMessage); } public synchronized void clearExceptionMessages() { @@ -97,7 +101,7 @@ public class PipeTaskMeta { ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream); ReadWriteIOUtils.write(exceptionMessages.size(), outputStream); - for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) { + for (final PipeRuntimeException pipeRuntimeException : exceptionMessages.values()) { pipeRuntimeException.serialize(outputStream); } } @@ -108,7 +112,7 @@ public class PipeTaskMeta { ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream); ReadWriteIOUtils.write(exceptionMessages.size(), outputStream); - for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) { + for (final PipeRuntimeException pipeRuntimeException : exceptionMessages.values()) { pipeRuntimeException.serialize(outputStream); } } @@ -123,7 +127,7 @@ public class PipeTaskMeta { for (int i = 0; i < size; ++i) { final PipeRuntimeException pipeRuntimeException = PipeRuntimeExceptionType.deserializeFrom(version, byteBuffer); - pipeTaskMeta.exceptionMessages.add(pipeRuntimeException); + pipeTaskMeta.exceptionMessages.put(pipeRuntimeException, pipeRuntimeException); } return pipeTaskMeta; } @@ -139,7 +143,7 @@ public class PipeTaskMeta { for (int i = 0; i < size; ++i) { final PipeRuntimeException pipeRuntimeException = PipeRuntimeExceptionType.deserializeFrom(version, inputStream); - pipeTaskMeta.exceptionMessages.add(pipeRuntimeException); + pipeTaskMeta.exceptionMessages.put(pipeRuntimeException, pipeRuntimeException); } return pipeTaskMeta; } @@ -155,7 +159,7 @@ public class PipeTaskMeta { PipeTaskMeta that = (PipeTaskMeta) obj; return progressIndex.get().equals(that.progressIndex.get()) && leaderDataNodeId.get() == that.leaderDataNodeId.get() - && Arrays.equals(exceptionMessages.toArray(), that.exceptionMessages.toArray()); + && exceptionMessages.equals(that.exceptionMessages); } @Override
