This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ad0d1fae9146f090bf6fdd40e4603f3f3ad9598d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 8 18:33:57 2023 +0800

    introduce reconnect strategy when connection broken
---
 .../api/exception/PipeConnectionException.java     | 31 +++++++++++++
 .../db/pipe/task/subtask/PipeConnectorSubtask.java | 53 +++++++++++++++++++++-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  6 +--
 3 files changed, 86 insertions(+), 4 deletions(-)

diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
new file mode 100644
index 0000000000..14cacc73fe
--- /dev/null
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipeConnectionException extends PipeException {
+
+  public PipeConnectionException(String message) {
+    super(message);
+  }
+
+  public PipeConnectionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index b420d9e51a..c7ab2bd58e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,13 +19,16 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +43,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
   // output
   private final PipeConnector pipeConnector;
 
-  /** @param taskID connectorAttributeSortedString */
+  /**
+   * @param taskID connectorAttributeSortedString
+   */
   public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
     super(taskID);
     // TODO: make the size of the queue size reasonable and configurable
@@ -59,6 +64,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
       return;
     }
 
+    try {
+      // TODO: reduce the frequency of heartbeat
+      pipeConnector.heartbeat();
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          "PipeConnector: failed to connect to the target system.", e);
+    }
+
     final Event event = pendingQueue.poll();
 
     try {
@@ -79,6 +92,44 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
   }
 
+  @Override
+  public void onFailure(@NotNull Throwable throwable) {
+    // retry to connect to the target system if the connection is broken
+    if (throwable instanceof PipeConnectionException) {
+      int retry = 0;
+      while (retry < MAX_RETRY_TIMES) {
+        try {
+          pipeConnector.handshake();
+          break;
+        } catch (Exception e) {
+          retry++;
+          LOGGER.error("Failed to reconnect to the target system, retrying... 
({} time(s))", retry);
+          try {
+            // TODO: make the retry interval configurable
+            Thread.sleep(retry * 1000L);
+          } catch (InterruptedException interruptedException) {
+            interruptedException.printStackTrace();
+          }
+        }
+      }
+
+      // stop current pipe task if failed to reconnect to the target system 
after MAX_RETRY_TIMES
+      // times
+      if (retry == MAX_RETRY_TIMES) {
+        LOGGER.error(
+            "Failed to reconnect to the target system after {} times, stopping 
current pipe task {}...",
+            MAX_RETRY_TIMES,
+            taskID);
+        lastFailedCause = throwable;
+        PipeAgent.runtime().report(this);
+        return;
+      }
+    }
+
+    // handle other exceptions as usual
+    super.onFailure(throwable);
+  }
+
   @Override
   public void close() {
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 45b0331700..e01c139018 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -38,17 +38,17 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSubtask.class);
 
-  private final String taskID;
+  protected final String taskID;
 
   private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
   private ExecutorService subtaskCallbackListeningExecutor;
 
   private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
 
-  private static final int MAX_RETRY_TIMES = 5;
+  protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
 
-  private Throwable lastFailedCause;
+  protected Throwable lastFailedCause;
 
   private final AtomicBoolean shouldStopSubmittingSelf = new 
AtomicBoolean(true);
 

Reply via email to