This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7cd361d0f7f Pipe: Fixed air gap receiver non-closing problem when
client exits in sender side (#12425)
7cd361d0f7f is described below
commit 7cd361d0f7fd9d06d931915193e3790a85a051bc
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 26 18:06:02 2024 +0800
Pipe: Fixed air gap receiver non-closing problem when client exits in
sender side (#12425)
---
.../db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index d07f180763c..99d454de382 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -59,7 +59,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
private boolean isELanguagePayload;
- public IoTDBAirGapReceiver(Socket socket, long receiverId) {
+ public IoTDBAirGapReceiver(final Socket socket, final long receiverId) {
this.socket = socket;
this.receiverId = receiverId;
@@ -82,7 +82,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
"Pipe air gap receiver {} closed because socket is closed. Socket:
{}",
receiverId,
socket);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Pipe air gap receiver {} closed because of exception. Socket: {}",
receiverId,
@@ -138,6 +138,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
}
} catch (final PipeConnectionException e) {
LOGGER.info("Socket closed when listening to data. Because: {}",
e.getMessage());
+ socket.close();
} catch (final Exception e) {
LOGGER.warn("Exception during handling receiving, receiverId: {}",
receiverId, e);
fail();
@@ -161,13 +162,13 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
final CRC32 crc32 = new CRC32();
crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN))
== crc32.getValue();
- } catch (Exception e) {
+ } catch (final Exception e) {
// ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
return false;
}
}
- private byte[] readData(InputStream inputStream) throws IOException {
+ private byte[] readData(final InputStream inputStream) throws IOException {
final int length = readLength(inputStream);
if (length <= 0) {
@@ -187,7 +188,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
* Read the length of the following data. The thread may typically block
here when there is no
* data to read.
*/
- private int readLength(InputStream inputStream) throws IOException {
+ private int readLength(final InputStream inputStream) throws IOException {
final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
readTillFull(inputStream, doubleIntLengthBytes);