This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cd361d0f7f Pipe: Fixed air gap receiver non-closing problem when 
client exits in sender side (#12425)
7cd361d0f7f is described below

commit 7cd361d0f7fd9d06d931915193e3790a85a051bc
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 26 18:06:02 2024 +0800

    Pipe: Fixed air gap receiver non-closing problem when client exits in 
sender side (#12425)
---
 .../db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index d07f180763c..99d454de382 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -59,7 +59,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
   private boolean isELanguagePayload;
 
-  public IoTDBAirGapReceiver(Socket socket, long receiverId) {
+  public IoTDBAirGapReceiver(final Socket socket, final long receiverId) {
     this.socket = socket;
     this.receiverId = receiverId;
 
@@ -82,7 +82,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
           "Pipe air gap receiver {} closed because socket is closed. Socket: 
{}",
           receiverId,
           socket);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Pipe air gap receiver {} closed because of exception. Socket: {}",
           receiverId,
@@ -138,6 +138,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       }
     } catch (final PipeConnectionException e) {
       LOGGER.info("Socket closed when listening to data. Because: {}", 
e.getMessage());
+      socket.close();
     } catch (final Exception e) {
       LOGGER.warn("Exception during handling receiving, receiverId: {}", 
receiverId, e);
       fail();
@@ -161,13 +162,13 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       final CRC32 crc32 = new CRC32();
       crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
       return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN)) 
== crc32.getValue();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       // ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
       return false;
     }
   }
 
-  private byte[] readData(InputStream inputStream) throws IOException {
+  private byte[] readData(final InputStream inputStream) throws IOException {
     final int length = readLength(inputStream);
 
     if (length <= 0) {
@@ -187,7 +188,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
    * Read the length of the following data. The thread may typically block 
here when there is no
    * data to read.
    */
-  private int readLength(InputStream inputStream) throws IOException {
+  private int readLength(final InputStream inputStream) throws IOException {
     final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
     readTillFull(inputStream, doubleIntLengthBytes);
 

Reply via email to