This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5848 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ad0d1fae9146f090bf6fdd40e4603f3f3ad9598d Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 8 18:33:57 2023 +0800 introduce reconnect strategy when connection broken --- .../api/exception/PipeConnectionException.java | 31 +++++++++++++ .../db/pipe/task/subtask/PipeConnectorSubtask.java | 53 +++++++++++++++++++++- .../iotdb/db/pipe/task/subtask/PipeSubtask.java | 6 +-- 3 files changed, 86 insertions(+), 4 deletions(-) diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java new file mode 100644 index 0000000000..14cacc73fe --- /dev/null +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.api.exception; + +public class PipeConnectionException extends PipeException { + + public PipeConnectionException(String message) { + super(message); + } + + public PipeConnectionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java index b420d9e51a..c7ab2bd58e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java @@ -19,13 +19,16 @@ package org.apache.iotdb.db.pipe.task.subtask; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +43,9 @@ public class PipeConnectorSubtask extends PipeSubtask { // output private final PipeConnector pipeConnector; - /** @param taskID connectorAttributeSortedString */ + /** + * @param taskID connectorAttributeSortedString + */ public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) { super(taskID); // TODO: make the size of the queue size reasonable and configurable @@ -59,6 +64,14 @@ public class PipeConnectorSubtask extends PipeSubtask { return; } + try { + // TODO: reduce the frequency of heartbeat + pipeConnector.heartbeat(); + } catch (Exception e) { + throw new PipeConnectionException( + "PipeConnector: failed to connect to the target system.", e); + } + final Event event = pendingQueue.poll(); try { @@ -79,6 +92,44 @@ public class PipeConnectorSubtask extends PipeSubtask { } } + @Override + public void onFailure(@NotNull Throwable throwable) { + // retry to connect to the target system if the connection is broken + if (throwable instanceof PipeConnectionException) { + int retry = 0; + while (retry < MAX_RETRY_TIMES) { + try { + pipeConnector.handshake(); + break; + } catch (Exception e) { + retry++; + LOGGER.error("Failed to reconnect to the target system, retrying... ({} time(s))", retry); + try { + // TODO: make the retry interval configurable + Thread.sleep(retry * 1000L); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } + } + + // stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES + // times + if (retry == MAX_RETRY_TIMES) { + LOGGER.error( + "Failed to reconnect to the target system after {} times, stopping current pipe task {}...", + MAX_RETRY_TIMES, + taskID); + lastFailedCause = throwable; + PipeAgent.runtime().report(this); + return; + } + } + + // handle other exceptions as usual + super.onFailure(throwable); + } + @Override public void close() { try { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java index 45b0331700..e01c139018 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java @@ -38,17 +38,17 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtask.class); - private final String taskID; + protected final String taskID; private ListeningExecutorService subtaskWorkerThreadPoolExecutor; private ExecutorService subtaskCallbackListeningExecutor; private final DecoratingLock callbackDecoratingLock = new DecoratingLock(); - private static final int MAX_RETRY_TIMES = 5; + protected static final int MAX_RETRY_TIMES = 5; private final AtomicInteger retryCount = new AtomicInteger(0); - private Throwable lastFailedCause; + protected Throwable lastFailedCause; private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
