This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 278da484343 Pipe: add decompressed length in RPC compression payload 
to avoid potential OOM on receiver (#12701)
278da484343 is described below

commit 278da484343b7f38329baafa6d2a9194f420a792
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Jun 11 20:44:00 2024 +0800

    Pipe: add decompressed length in RPC compression payload to avoid potential 
OOM on receiver (#12701)
---
 .../pipe/connector/compressor/PipeCompressor.java  | 19 ++++++++++++
 .../connector/compressor/PipeGZIPCompressor.java   |  7 +++++
 .../connector/compressor/PipeLZ4Compressor.java    |  7 +++++
 .../connector/compressor/PipeLZMA2Compressor.java  |  7 +++++
 .../connector/compressor/PipeSnappyCompressor.java |  7 +++++
 .../connector/compressor/PipeZSTDCompressor.java   |  5 +++
 .../thrift/request/PipeTransferCompressedReq.java  | 36 ++++++++++++----------
 7 files changed, 72 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
index 4d5113e4c20..a7f8405f701 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
@@ -49,8 +49,27 @@ public abstract class PipeCompressor {
 
   public abstract byte[] compress(byte[] data) throws IOException;
 
+  /**
+   * Decompress the byte array to a byte array. NOTE: the length of the 
decompressed byte array is
+   * not provided in this method, and some decompressors (LZ4) may construct 
large byte arrays,
+   * leading to potential OOM.
+   *
+   * @param byteArray the byte array to be decompressed
+   * @return the decompressed byte array
+   * @throws IOException
+   */
   public abstract byte[] decompress(byte[] byteArray) throws IOException;
 
+  /**
+   * Decompress the byte array to a byte array with a known length.
+   *
+   * @param byteArray the byte array to be decompressed
+   * @param decompressedLength the length of the decompressed byte array
+   * @return the decompressed byte array
+   * @throws IOException
+   */
+  public abstract byte[] decompress(byte[] byteArray, int decompressedLength) 
throws IOException;
+
   public byte serialize() {
     return compressionType.getIndex();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
index 2cc1887555c..568122d7035 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
@@ -44,4 +44,11 @@ public class PipeGZIPCompressor extends PipeCompressor {
   public byte[] decompress(byte[] byteArray) throws IOException {
     return DECOMPRESSOR.uncompress(byteArray);
   }
+
+  @Override
+  public byte[] decompress(byte[] byteArray, int decompressedLength) throws 
IOException {
+    byte[] uncompressed = new byte[decompressedLength];
+    DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+    return uncompressed;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
index 5d6db90e661..52a2412a30f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
@@ -44,4 +44,11 @@ public class PipeLZ4Compressor extends PipeCompressor {
   public byte[] decompress(byte[] byteArray) throws IOException {
     return DECOMPRESSOR.uncompress(byteArray);
   }
+
+  @Override
+  public byte[] decompress(byte[] byteArray, int decompressedLength) throws 
IOException {
+    byte[] uncompressed = new byte[decompressedLength];
+    DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+    return uncompressed;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
index 9e7c0a8b8d3..eebb703f9be 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
@@ -44,4 +44,11 @@ public class PipeLZMA2Compressor extends PipeCompressor {
   public byte[] decompress(byte[] byteArray) throws IOException {
     return DECOMPRESSOR.uncompress(byteArray);
   }
+
+  @Override
+  public byte[] decompress(byte[] byteArray, int decompressedLength) throws 
IOException {
+    byte[] uncompressed = new byte[decompressedLength];
+    DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+    return uncompressed;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
index 72333c1e929..284b29610a4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
@@ -44,4 +44,11 @@ public class PipeSnappyCompressor extends PipeCompressor {
   public byte[] decompress(byte[] byteArray) throws IOException {
     return DECOMPRESSOR.uncompress(byteArray);
   }
+
+  @Override
+  public byte[] decompress(byte[] byteArray, int decompressedLength) throws 
IOException {
+    byte[] uncompressed = new byte[decompressedLength];
+    DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+    return uncompressed;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
index 50e2e1f845c..f5e4d6476e8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
@@ -41,4 +41,9 @@ public class PipeZSTDCompressor extends PipeCompressor {
   public byte[] decompress(byte[] byteArray) {
     return Zstd.decompress(byteArray, (int) Zstd.decompressedSize(byteArray, 
0, byteArray.length));
   }
+
+  @Override
+  public byte[] decompress(byte[] byteArray, int decompressedLength) {
+    return Zstd.decompress(byteArray, decompressedLength);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
index f5e114949bd..b92b2d4256a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
@@ -44,8 +44,10 @@ public class PipeTransferCompressedReq extends 
TPipeTransferReq {
     // version
     // type: TRANSFER_COMPRESSED
     // body:
-    //   (byte) count of compressors
-    //   (bytes) 1 byte for each compressor
+    //   (byte) count of compressors (n)
+    //   (n*3 bytes) for each compressor:
+    //     (byte) compressor type
+    //     (int) length of uncompressed bytes
     //   compressed req:
     //     (byte) version
     //     (2 bytes) type
@@ -56,18 +58,17 @@ public class PipeTransferCompressedReq extends 
TPipeTransferReq {
 
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
-      for (final PipeCompressor compressor : compressors) {
-        ReadWriteIOUtils.write(compressor.serialize(), outputStream);
-      }
-
       byte[] body =
           BytesUtils.concatByteArrayList(
               Arrays.asList(
                   new byte[] {originalReq.version},
                   BytesUtils.shortToBytes(originalReq.type),
                   originalReq.getBody()));
+
+      ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
       for (final PipeCompressor compressor : compressors) {
+        ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+        ReadWriteIOUtils.write(body.length, outputStream);
         body = compressor.compress(body);
       }
       outputStream.write(body);
@@ -84,17 +85,19 @@ public class PipeTransferCompressedReq extends 
TPipeTransferReq {
     final ByteBuffer compressedBuffer = transferReq.body;
 
     final List<PipeCompressor> compressors = new ArrayList<>();
+    final List<Integer> uncompressedLengths = new ArrayList<>();
     final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer);
     for (int i = 0; i < compressorsSize; ++i) {
       compressors.add(
           
PipeCompressorFactory.getCompressor(ReadWriteIOUtils.readByte(compressedBuffer)));
+      uncompressedLengths.add(ReadWriteIOUtils.readInt(compressedBuffer));
     }
 
     byte[] body = new byte[compressedBuffer.remaining()];
     compressedBuffer.get(body);
 
     for (int i = compressors.size() - 1; i >= 0; --i) {
-      body = compressors.get(i).decompress(body);
+      body = compressors.get(i).decompress(body, uncompressedLengths.get(i));
     }
 
     final ByteBuffer decompressedBuffer = ByteBuffer.wrap(body);
@@ -115,26 +118,27 @@ public class PipeTransferCompressedReq extends 
TPipeTransferReq {
     // The generated bytes consists of:
     // (byte) version
     // (2 bytes) type: TRANSFER_COMPRESSED
-    // (byte) count of compressors
-    // (bytes) 1 byte for each compressor
+    // (byte) count of compressors (n)
+    // (n*3 bytes) for each compressor:
+    //   (byte) compressor type
+    //   (int) length of uncompressed bytes
     // compressed req:
     //   (byte) version
     //   (2 bytes) type
     //   (bytes) body
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      byte[] body = rawReqInBytes;
+
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
       ReadWriteIOUtils.write(PipeRequestType.TRANSFER_COMPRESSED.getType(), 
outputStream);
       ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
       for (final PipeCompressor compressor : compressors) {
         ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+        ReadWriteIOUtils.write(body.length, outputStream);
+        body = compressor.compress(body);
       }
-
-      byte[] compressedReq = rawReqInBytes;
-      for (final PipeCompressor compressor : compressors) {
-        compressedReq = compressor.compress(compressedReq);
-      }
-      outputStream.write(compressedReq);
+      outputStream.write(body);
 
       return byteArrayOutputStream.toByteArray();
     }

Reply via email to