This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 809b3aaff8e Pipe: Fixed the bug that a failed check sum may 
permanently disable an air gap receiver & Optimized the air gap file offset 
logic & Fixed the bug that not connected socket may die forever in "send" 
(#14384)
809b3aaff8e is described below

commit 809b3aaff8ee5b1296ed516c438a3c9ca18bac5d
Author: Caideyipi <[email protected]>
AuthorDate: Fri Dec 13 12:55:54 2024 +0800

    Pipe: Fixed the bug that a failed check sum may permanently disable an air 
gap receiver & Optimized the air gap file offset logic & Fixed the bug that not 
connected socket may die forever in "send" (#14384)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../api/exception/PipeConnectionException.java     |  4 +-
 .../iotdb/pipe/api/exception/PipeException.java    |  6 +--
 .../subtask/connector/PipeConnectorSubtask.java    | 34 -------------
 .../pipeconsensus/PipeConsensusAsyncConnector.java |  4 +-
 .../pipeconsensus/PipeConsensusSyncConnector.java  | 56 +++++++++++-----------
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 45 ++++++++++++++---
 .../protocol/airgap/IoTDBAirGapReceiverAgent.java  |  7 ++-
 .../task/subtask/SubscriptionConnectorSubtask.java |  9 ----
 .../iotdb/commons/concurrent/ThreadName.java       |  4 ++
 .../PipeRuntimeConnectorCriticalException.java     | 14 +++---
 ...meConnectorRetryTimesConfigurableException.java |  3 +-
 .../pipe/PipeRuntimeCriticalException.java         | 14 +++---
 .../exception/pipe/PipeRuntimeException.java       | 10 ++--
 .../connector/protocol/IoTDBAirGapConnector.java   |  6 ++-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 37 +++++++-------
 .../commons/pipe/receiver/IoTDBReceiverAgent.java  |  6 +--
 16 files changed, 132 insertions(+), 127 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
index dc9d5e32968..ded27cc433a 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -24,11 +24,11 @@ public class PipeConnectionException extends PipeException {
   public static final String CONNECTION_ERROR_FORMATTER =
       "Error occurred while connecting to receiver %s:%s, please check network 
connectivity or SSL configurations when enable SSL transmission";
 
-  public PipeConnectionException(String message) {
+  public PipeConnectionException(final String message) {
     super(message);
   }
 
-  public PipeConnectionException(String message, Throwable cause) {
+  public PipeConnectionException(final String message, final Throwable cause) {
     super(message, cause);
   }
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
index 3291f87be0d..7be6d6a5560 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java
@@ -23,17 +23,17 @@ public class PipeException extends RuntimeException {
 
   private final long timeStamp;
 
-  public PipeException(String message) {
+  public PipeException(final String message) {
     super(message);
     this.timeStamp = System.currentTimeMillis();
   }
 
-  public PipeException(String message, long timeStamp) {
+  public PipeException(final String message, final long timeStamp) {
     super(message);
     this.timeStamp = timeStamp;
   }
 
-  public PipeException(String message, Throwable cause) {
+  public PipeException(final String message, final Throwable cause) {
     super(message, cause);
     this.timeStamp = System.currentTimeMillis();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 6171e50b95c..cc2d5b5b346 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -44,9 +44,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorSubtask.class);
@@ -295,37 +292,6 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
         : 0;
   }
 
-  // For performance, this will not acquire lock and does not guarantee the 
correct
-  // result. However, this shall not cause any exceptions when concurrently 
read & written.
-  public int getEventCount(final String pipeName) {
-    final AtomicInteger count = new AtomicInteger(0);
-    try {
-      inputPendingQueue.forEach(
-          event -> {
-            if (event instanceof EnrichedEvent
-                && pipeName.equals(((EnrichedEvent) event).getPipeName())) {
-              count.incrementAndGet();
-            }
-          });
-    } catch (final Exception e) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Exception occurred when counting event of pipe {}, root cause: 
{}",
-            pipeName,
-            ErrorHandlingUtils.getRootCause(e).getMessage(),
-            e);
-      }
-    }
-    // Avoid potential NPE in "getPipeName"
-    final EnrichedEvent event =
-        lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
-    return count.get()
-        + (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
-            ? ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).getRetryEventCount(pipeName)
-            : 0)
-        + (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1 
: 0);
-  }
-
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 4f56d1a847b..c0fa792da79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -406,8 +406,8 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
     // Transfer deletion
     if (event instanceof PipeDeleteDataNodeEvent) {
-      PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) 
event;
-      boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent);
+      final PipeDeleteDataNodeEvent deleteDataNodeEvent = 
(PipeDeleteDataNodeEvent) event;
+      final boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent);
       if (!enqueueResult) {
         throw new PipeRuntimeConnectorRetryTimesConfigurableException(
             ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 01b406674c9..c00527ec38d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -82,10 +82,10 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
 
   public PipeConsensusSyncConnector(
-      List<TEndPoint> peers,
-      int consensusGroupId,
-      int thisDataNodeId,
-      PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
+      final List<TEndPoint> peers,
+      final int consensusGroupId,
+      final int thisDataNodeId,
+      final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
     // In PipeConsensus, one pipeConsensusTask corresponds to a 
pipeConsensusConnector. Thus,
     // `peers` here actually is a singletonList that contains one peer's 
TEndPoint. But here we
     // retain the implementation of list to cope with possible future expansion
@@ -98,7 +98,8 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
     super.customize(parameters, configuration);
     if (isTabletBatchModeEnabled) {
@@ -125,7 +126,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   }
 
   @Override
-  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+  public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // Note: here we don't need to do type judgment here, because 
PipeConsensus uses
     // PIPE_CONSENSUS_PROCESSOR and will not change the event type like
     // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
@@ -135,12 +136,12 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
           doTransfer();
         }
       } else {
-        long startTime = System.nanoTime();
+        final long startTime = System.nanoTime();
         doTransferWrapper((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
-        long duration = System.nanoTime() - startTime;
+        final long duration = System.nanoTime() - startTime;
         pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeConnectionException(
           String.format(
               "Failed to transfer tablet insertion event %s, because %s.",
@@ -150,18 +151,18 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   }
 
   @Override
-  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+  public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     // Note: here we don't need to do type judgment here, because 
PipeConsensus uses DO_NOTHING
     // processor and will not change the event type like
     // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
     try {
-      long startTime = System.nanoTime();
+      final long startTime = System.nanoTime();
       // In order to commit in order
       if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
         doTransfer();
       }
       doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
-      long duration = System.nanoTime() - startTime;
+      final long duration = System.nanoTime() - startTime;
       pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
     } catch (Exception e) {
       throw new PipeConnectionException(
@@ -173,7 +174,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   }
 
   @Override
-  public void transfer(Event event) throws Exception {
+  public void transfer(final Event event) throws Exception {
     // in order to commit in order
     if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
       doTransfer();
@@ -209,7 +210,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
       //      }
 
       tabletBatchBuilder.onSuccess();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeConnectionException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -240,12 +241,12 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
       throws PipeException {
     final ProgressIndex progressIndex;
     final TPipeConsensusTransferResp resp;
-    TCommitId tCommitId =
+    final TCommitId tCommitId =
         new TCommitId(
             pipeDeleteDataNodeEvent.getCommitId(),
             pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
             pipeDeleteDataNodeEvent.getRebootTimes());
-    TConsensusGroupId tConsensusGroupId =
+    final TConsensusGroupId tConsensusGroupId =
         new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
 
     try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
@@ -259,7 +260,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                   tConsensusGroupId,
                   progressIndex,
                   thisDataNodeId));
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeConnectionException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -310,12 +311,12 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     final InsertNode insertNode;
     final ProgressIndex progressIndex;
     final TPipeConsensusTransferResp resp;
-    TCommitId tCommitId =
+    final TCommitId tCommitId =
         new TCommitId(
             pipeInsertNodeTabletInsertionEvent.getCommitId(),
             
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
             pipeInsertNodeTabletInsertionEvent.getRebootTimes());
-    TConsensusGroupId tConsensusGroupId =
+    final TConsensusGroupId tConsensusGroupId =
         new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
 
     try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
@@ -338,7 +339,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                     progressIndex,
                     thisDataNodeId));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeConnectionException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -362,7 +363,8 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) 
throws PipeException {
+  private void doTransfer(final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
+      throws PipeException {
     final File tsFile = pipeTsFileInsertionEvent.getTsFile();
     final File modFile = pipeTsFileInsertionEvent.getModFile();
     final TPipeConsensusTransferResp resp;
@@ -411,7 +413,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
                     pipeTsFileInsertionEvent.getProgressIndex(),
                     thisDataNodeId));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeConnectionException(
           String.format(
               PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -436,11 +438,11 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   }
 
   protected void transferFilePieces(
-      File file,
-      SyncPipeConsensusServiceClient syncPipeConsensusServiceClient,
-      boolean isMultiFile,
-      TCommitId tCommitId,
-      TConsensusGroupId tConsensusGroupId)
+      final File file,
+      final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient,
+      final boolean isMultiFile,
+      final TCommitId tCommitId,
+      final TConsensusGroupId tConsensusGroupId)
       throws PipeException, IOException {
     final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index d24e097badf..96b07b4a678 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -109,9 +109,20 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     try {
       final byte[] data = readData(inputStream);
 
+      // If check sum failed, it indicates that the length we read may not be 
correct.
+      // Namely, there may be remaining bytes in the socket stream, which will 
fail any subsequent
+      // attempts to read from that.
+      // We directly close the socket here.
       if (!checkSum(data)) {
-        LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
-        fail();
+        LOGGER.warn(
+            "Pipe air gap receiver {} closed because of checksum failed. 
Socket: {}",
+            receiverId,
+            socket);
+        try {
+          fail();
+        } finally {
+          socket.close();
+        }
         return;
       }
 
@@ -134,21 +145,31 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
           || status.getCode()
               == 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
         LOGGER.info(
-            "TSStatus:{} is encountered at the air gap receiver, will 
ignore.", resp.getStatus());
+            "Pipe air gap receiver {}: TSStatus {} is encountered at the air 
gap receiver, will ignore.",
+            receiverId,
+            resp.getStatus());
         ok();
       } else {
         LOGGER.warn(
-            "Handle data failed, receiverId: {}, status: {}, req: {}",
+            "Pipe air gap receiver {}: Handle data failed, status: {}, req: 
{}",
             receiverId,
             resp.getStatus(),
             req);
         fail();
       }
     } catch (final PipeConnectionException e) {
-      LOGGER.info("Socket closed when listening to data. Because: {}", 
e.getMessage());
+      LOGGER.info(
+          "Pipe air gap receiver {}: Socket {} closed when listening to data. 
Because: {}",
+          receiverId,
+          socket,
+          e.getMessage());
       socket.close();
     } catch (final Exception e) {
-      LOGGER.warn("Exception during handling receiving, receiverId: {}", 
receiverId, e);
+      LOGGER.warn(
+          "Pipe air gap receiver {}: Exception during handling receiving. 
Socket: {}",
+          receiverId,
+          socket,
+          e);
       fail();
     }
   }
@@ -169,7 +190,17 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     try {
       final CRC32 crc32 = new CRC32();
       crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
-      return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN)) 
== crc32.getValue();
+
+      final long expectedChecksum = 
BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN));
+      final long actualChecksum = crc32.getValue();
+      if (expectedChecksum != actualChecksum) {
+        LOGGER.warn(
+            "Pipe air gap receiver {}: checksum failed, expected: {}, actual: 
{}",
+            receiverId,
+            expectedChecksum,
+            actualChecksum);
+      }
+      return expectedChecksum == actualChecksum;
     } catch (final Exception e) {
       // ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
       return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
index 3097b0fcac8..644bda1835c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java
@@ -52,7 +52,12 @@ public class IoTDBAirGapReceiverAgent implements IService {
   public void listen() {
     try {
       final Socket socket = serverSocket.accept();
-      new Thread(new IoTDBAirGapReceiver(socket, 
receiverId.incrementAndGet())).start();
+      final long airGapReceiverId = receiverId.incrementAndGet();
+      final Thread airGapReceiverThread =
+          new Thread(new IoTDBAirGapReceiver(socket, airGapReceiverId));
+      airGapReceiverThread.setName(
+          ThreadName.PIPE_AIR_GAP_RECEIVER.getName() + "-" + airGapReceiverId);
+      airGapReceiverThread.start();
     } catch (final IOException e) {
       LOGGER.warn("Unhandled exception during pipe air gap receiver 
listening", e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index 9b7587381d9..bf1155b4b58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -93,13 +93,4 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
 
     return SubscriptionAgent.broker().executePrefetch(consumerGroupId, 
topicName);
   }
-
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
-
-  @Override
-  public int getEventCount(final String pipeName) {
-    // count the number of pipe events in sink queue and prefetching queue, 
note that can safely
-    // ignore lastEvent
-    return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, 
topicName);
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index fee09496730..ec4a1a7c2cd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -144,6 +144,7 @@ public enum ThreadName {
       "Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
+  PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
   SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
   SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
@@ -295,9 +296,12 @@ public enum ThreadName {
               PIPE_RUNTIME_HEARTBEAT,
               PIPE_RUNTIME_PROCEDURE_SUBMITTER,
               PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
+              PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
               PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
               PIPE_RECEIVER_AIR_GAP_AGENT,
+              PIPE_AIR_GAP_RECEIVER,
               SUBSCRIPTION_EXECUTOR_POOL,
+              SUBSCRIPTION_RUNTIME_META_SYNCER,
               WINDOW_EVALUATION_SERVICE,
               STATEFUL_TRIGGER_INFORMATION_UPDATER));
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
index d33cfc16514..f5e2d22f158 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
@@ -31,16 +31,16 @@ import java.util.Objects;
 
 public class PipeRuntimeConnectorCriticalException extends 
PipeRuntimeCriticalException {
 
-  public PipeRuntimeConnectorCriticalException(String message) {
+  public PipeRuntimeConnectorCriticalException(final String message) {
     super(message);
   }
 
-  public PipeRuntimeConnectorCriticalException(String message, long timeStamp) 
{
+  public PipeRuntimeConnectorCriticalException(final String message, final 
long timeStamp) {
     super(message, timeStamp);
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     return obj instanceof PipeRuntimeConnectorCriticalException
         && Objects.equals(getMessage(), 
((PipeRuntimeConnectorCriticalException) obj).getMessage())
         && Objects.equals(getTimeStamp(), ((PipeRuntimeException) 
obj).getTimeStamp());
@@ -52,21 +52,21 @@ public class PipeRuntimeConnectorCriticalException extends 
PipeRuntimeCriticalEx
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {
+  public void serialize(final ByteBuffer byteBuffer) {
     
PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(byteBuffer);
     ReadWriteIOUtils.write(getMessage(), byteBuffer);
     ReadWriteIOUtils.write(getTimeStamp(), byteBuffer);
   }
 
   @Override
-  public void serialize(OutputStream stream) throws IOException {
+  public void serialize(final OutputStream stream) throws IOException {
     PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(stream);
     ReadWriteIOUtils.write(getMessage(), stream);
     ReadWriteIOUtils.write(getTimeStamp(), stream);
   }
 
   public static PipeRuntimeConnectorCriticalException deserializeFrom(
-      PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
+      final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
     final String message = ReadWriteIOUtils.readString(byteBuffer);
     switch (version) {
       case VERSION_1:
@@ -80,7 +80,7 @@ public class PipeRuntimeConnectorCriticalException extends 
PipeRuntimeCriticalEx
   }
 
   public static PipeRuntimeConnectorCriticalException deserializeFrom(
-      PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
+      final PipeRuntimeMetaVersion version, final InputStream stream) throws 
IOException {
     final String message = ReadWriteIOUtils.readString(stream);
     switch (version) {
       case VERSION_1:
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
index d5c70b9eb90..42244ea7d4b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorRetryTimesConfigurableException.java
@@ -24,7 +24,8 @@ public class 
PipeRuntimeConnectorRetryTimesConfigurableException
 
   private final int retryTimes;
 
-  public PipeRuntimeConnectorRetryTimesConfigurableException(String message, 
int retryTimes) {
+  public PipeRuntimeConnectorRetryTimesConfigurableException(
+      final String message, final int retryTimes) {
     super(message);
     this.retryTimes = retryTimes;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
index aa725d94f74..c6db38e23fa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
@@ -31,37 +31,37 @@ import java.util.Objects;
 
 public class PipeRuntimeCriticalException extends PipeRuntimeException {
 
-  public PipeRuntimeCriticalException(String message) {
+  public PipeRuntimeCriticalException(final String message) {
     super(message);
   }
 
-  public PipeRuntimeCriticalException(String message, long timeStamp) {
+  public PipeRuntimeCriticalException(final String message, final long 
timeStamp) {
     super(message, timeStamp);
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     return obj instanceof PipeRuntimeCriticalException
         && Objects.equals(getMessage(), ((PipeRuntimeCriticalException) 
obj).getMessage())
         && Objects.equals(getTimeStamp(), ((PipeRuntimeException) 
obj).getTimeStamp());
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {
+  public void serialize(final ByteBuffer byteBuffer) {
     PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(byteBuffer);
     ReadWriteIOUtils.write(getMessage(), byteBuffer);
     ReadWriteIOUtils.write(getTimeStamp(), byteBuffer);
   }
 
   @Override
-  public void serialize(OutputStream stream) throws IOException {
+  public void serialize(final OutputStream stream) throws IOException {
     PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(stream);
     ReadWriteIOUtils.write(getMessage(), stream);
     ReadWriteIOUtils.write(getTimeStamp(), stream);
   }
 
   public static PipeRuntimeCriticalException deserializeFrom(
-      PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
+      final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
     final String message = ReadWriteIOUtils.readString(byteBuffer);
     switch (version) {
       case VERSION_1:
@@ -74,7 +74,7 @@ public class PipeRuntimeCriticalException extends 
PipeRuntimeException {
   }
 
   public static PipeRuntimeCriticalException deserializeFrom(
-      PipeRuntimeMetaVersion version, InputStream stream) throws IOException {
+      final PipeRuntimeMetaVersion version, final InputStream stream) throws 
IOException {
     final String message = ReadWriteIOUtils.readString(stream);
     switch (version) {
       case VERSION_1:
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
index 3bcc96b8e85..24fa4a1edda 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
@@ -28,16 +28,16 @@ import java.util.Objects;
 
 public abstract class PipeRuntimeException extends PipeException {
 
-  protected PipeRuntimeException(String message) {
+  protected PipeRuntimeException(final String message) {
     super(message);
   }
 
-  protected PipeRuntimeException(String message, long timeStamp) {
+  protected PipeRuntimeException(final String message, final long timeStamp) {
     super(message, timeStamp);
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     return obj instanceof PipeRuntimeException
         && Objects.equals(getMessage(), ((PipeRuntimeException) 
obj).getMessage())
         && Objects.equals(getTimeStamp(), ((PipeRuntimeException) 
obj).getTimeStamp());
@@ -48,7 +48,7 @@ public abstract class PipeRuntimeException extends 
PipeException {
     return Objects.hash(getMessage(), getTimeStamp());
   }
 
-  public abstract void serialize(ByteBuffer byteBuffer);
+  public abstract void serialize(final ByteBuffer byteBuffer);
 
-  public abstract void serialize(OutputStream stream) throws IOException;
+  public abstract void serialize(final OutputStream stream) throws IOException;
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 8972fb424b3..a404b202cfd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -62,7 +63,7 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
 
     private final TEndPoint endPoint;
 
-    public AirGapSocket(String ip, int port) {
+    public AirGapSocket(final String ip, final int port) {
       this.endPoint = new TEndPoint(ip, port);
     }
 
@@ -292,7 +293,8 @@ public abstract class IoTDBAirGapConnector extends 
IoTDBConnector {
       final String pipeName, final long creationTime, final AirGapSocket 
socket, byte[] bytes)
       throws IOException {
     if (!socket.isConnected()) {
-      return false;
+      throw new SocketException(
+          String.format("Socket %s is closed, will try to handshake", socket));
     }
 
     bytes = compressIfNeeded(bytes);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 1ab1be21454..872c1bce436 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -288,16 +288,18 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     try {
       updateWritingFileIfNeeded(req.getFileName(), isSingleFile);
 
+      // If the request is through air gap, the sender will resend the file 
piece from the beginning
+      // of the file. So the receiver should reset the offset of the writing 
file to the beginning
+      // of the file.
+      if (isRequestThroughAirGap && req.getStartWritingOffset() < 
writingFileWriter.length()) {
+        writingFileWriter.setLength(req.getStartWritingOffset());
+      }
+
       if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
-        if (isRequestThroughAirGap
-            || !writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
-          // 1. If the request is through air gap, the sender will resend the 
file piece from the
-          // beginning of the file. So the receiver should reset the offset of 
the writing file to
-          // the beginning of the file.
-          // 2. If the file is a tsFile, then the content will not be changed 
for a specific
-          // filename. However, for other files (mod, snapshot, etc.) the 
content varies for the
-          // same name in different times, then we must rewrite the file to 
apply the newest
-          // version.
+        if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+          // If the file is a tsFile, then the content will not be changed for 
a specific filename.
+          // However, for other files (mod, snapshot, etc.) the content varies 
for the same name in
+          // different times, then we must rewrite the file to apply the 
newest version.
           writingFileWriter.setLength(0);
         }
 
@@ -317,7 +319,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       writingFileWriter.write(req.getFilePiece());
       return PipeTransferFilePieceResp.toTPipeTransferResp(
           RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Failed to write file piece from req {}.", 
receiverId.get(), req, e);
       final TSStatus status =
@@ -377,8 +379,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         writingFile.getPath());
   }
 
-  private boolean isFileExistedAndNameCorrect(String fileName) {
-    return writingFile != null && writingFile.getName().equals(fileName);
+  private boolean isFileExistedAndNameCorrect(final String fileName) {
+    return writingFile != null && writingFile.exists() && 
writingFile.getName().equals(fileName);
   }
 
   private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
@@ -393,7 +395,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath(),
             writingFile == null ? 0 : writingFile.length());
-      } catch (Exception e) {
+      } catch (final Exception e) {
         LOGGER.warn(
             "Receiver id = {}: Failed to close current writing file writer {}, 
because {}.",
             receiverId.get(),
@@ -414,6 +416,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private void deleteCurrentWritingFile() {
     if (writingFile != null) {
       deleteFile(writingFile);
+      writingFile = null;
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
@@ -422,7 +425,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     }
   }
 
-  private void deleteFile(File file) {
+  private void deleteFile(final File file) {
     if (file.exists()) {
       try {
         FileUtils.delete(file);
@@ -430,7 +433,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             "Receiver id = {}: Original writing file {} was deleted.",
             receiverId.get(),
             file.getPath());
-      } catch (Exception e) {
+      } catch (final Exception e) {
         LOGGER.warn(
             "Receiver id = {}: Failed to delete original writing file {}, 
because {}.",
             receiverId.get(),
@@ -512,7 +515,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             status.getMessage());
       }
       return new TPipeTransferResp(status);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Failed to seal file {} from req {}.",
           receiverId.get(),
@@ -597,7 +600,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             status);
       }
       return new TPipeTransferResp(status);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Failed to seal file {} from req {}.", 
receiverId.get(), files, req, e);
       return new TPipeTransferResp(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
index 97e582f2407..c09812f2003 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
@@ -104,7 +104,7 @@ public abstract class IoTDBReceiverAgent {
     handleClientExit(null);
   }
 
-  public final void handleClientExit(String key) {
+  public final void handleClientExit(final String key) {
     final IoTDBReceiver receiver = getReceiverWithSpecifiedClient(key);
     if (receiver != null) {
       receiver.handleExit();
@@ -116,14 +116,14 @@ public abstract class IoTDBReceiverAgent {
     try {
       FileUtils.deleteDirectory(receiverFileDir);
       LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
     }
 
     try {
       FileUtils.forceMkdir(receiverFileDir);
       LOGGER.info("Create pipe receiver dir {} successfully.", 
receiverFileDir);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
     }
   }


Reply via email to