This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 278da484343 Pipe: add decompressed length in RPC compression payload
to avoid potential OOM on receiver (#12701)
278da484343 is described below
commit 278da484343b7f38329baafa6d2a9194f420a792
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Jun 11 20:44:00 2024 +0800
Pipe: add decompressed length in RPC compression payload to avoid potential
OOM on receiver (#12701)
---
.../pipe/connector/compressor/PipeCompressor.java | 19 ++++++++++++
.../connector/compressor/PipeGZIPCompressor.java | 7 +++++
.../connector/compressor/PipeLZ4Compressor.java | 7 +++++
.../connector/compressor/PipeLZMA2Compressor.java | 7 +++++
.../connector/compressor/PipeSnappyCompressor.java | 7 +++++
.../connector/compressor/PipeZSTDCompressor.java | 5 +++
.../thrift/request/PipeTransferCompressedReq.java | 36 ++++++++++++----------
7 files changed, 72 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
index 4d5113e4c20..a7f8405f701 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressor.java
@@ -49,8 +49,27 @@ public abstract class PipeCompressor {
public abstract byte[] compress(byte[] data) throws IOException;
+ /**
+ * Decompress the byte array to a byte array. NOTE: the length of the
decompressed byte array is
+ * not provided in this method, and some decompressors (LZ4) may construct
large byte arrays,
+ * leading to potential OOM.
+ *
+ * @param byteArray the byte array to be decompressed
+ * @return the decompressed byte array
+ * @throws IOException
+ */
public abstract byte[] decompress(byte[] byteArray) throws IOException;
+ /**
+ * Decompress the byte array to a byte array with a known length.
+ *
+ * @param byteArray the byte array to be decompressed
+ * @param decompressedLength the length of the decompressed byte array
+ * @return the decompressed byte array
+ * @throws IOException
+ */
+ public abstract byte[] decompress(byte[] byteArray, int decompressedLength)
throws IOException;
+
public byte serialize() {
return compressionType.getIndex();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
index 2cc1887555c..568122d7035 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeGZIPCompressor.java
@@ -44,4 +44,11 @@ public class PipeGZIPCompressor extends PipeCompressor {
public byte[] decompress(byte[] byteArray) throws IOException {
return DECOMPRESSOR.uncompress(byteArray);
}
+
+ @Override
+ public byte[] decompress(byte[] byteArray, int decompressedLength) throws
IOException {
+ byte[] uncompressed = new byte[decompressedLength];
+ DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+ return uncompressed;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
index 5d6db90e661..52a2412a30f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZ4Compressor.java
@@ -44,4 +44,11 @@ public class PipeLZ4Compressor extends PipeCompressor {
public byte[] decompress(byte[] byteArray) throws IOException {
return DECOMPRESSOR.uncompress(byteArray);
}
+
+ @Override
+ public byte[] decompress(byte[] byteArray, int decompressedLength) throws
IOException {
+ byte[] uncompressed = new byte[decompressedLength];
+ DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+ return uncompressed;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
index 9e7c0a8b8d3..eebb703f9be 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeLZMA2Compressor.java
@@ -44,4 +44,11 @@ public class PipeLZMA2Compressor extends PipeCompressor {
public byte[] decompress(byte[] byteArray) throws IOException {
return DECOMPRESSOR.uncompress(byteArray);
}
+
+ @Override
+ public byte[] decompress(byte[] byteArray, int decompressedLength) throws
IOException {
+ byte[] uncompressed = new byte[decompressedLength];
+ DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+ return uncompressed;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
index 72333c1e929..284b29610a4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeSnappyCompressor.java
@@ -44,4 +44,11 @@ public class PipeSnappyCompressor extends PipeCompressor {
public byte[] decompress(byte[] byteArray) throws IOException {
return DECOMPRESSOR.uncompress(byteArray);
}
+
+ @Override
+ public byte[] decompress(byte[] byteArray, int decompressedLength) throws
IOException {
+ byte[] uncompressed = new byte[decompressedLength];
+ DECOMPRESSOR.uncompress(byteArray, 0, byteArray.length, uncompressed, 0);
+ return uncompressed;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
index 50e2e1f845c..f5e4d6476e8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
@@ -41,4 +41,9 @@ public class PipeZSTDCompressor extends PipeCompressor {
public byte[] decompress(byte[] byteArray) {
return Zstd.decompress(byteArray, (int) Zstd.decompressedSize(byteArray,
0, byteArray.length));
}
+
+ @Override
+ public byte[] decompress(byte[] byteArray, int decompressedLength) {
+ return Zstd.decompress(byteArray, decompressedLength);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
index f5e114949bd..b92b2d4256a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferCompressedReq.java
@@ -44,8 +44,10 @@ public class PipeTransferCompressedReq extends
TPipeTransferReq {
// version
// type: TRANSFER_COMPRESSED
// body:
- // (byte) count of compressors
- // (bytes) 1 byte for each compressor
+ // (byte) count of compressors (n)
+ // (n*3 bytes) for each compressor:
+ // (byte) compressor type
+ // (int) length of uncompressed bytes
// compressed req:
// (byte) version
// (2 bytes) type
@@ -56,18 +58,17 @@ public class PipeTransferCompressedReq extends
TPipeTransferReq {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
- for (final PipeCompressor compressor : compressors) {
- ReadWriteIOUtils.write(compressor.serialize(), outputStream);
- }
-
byte[] body =
BytesUtils.concatByteArrayList(
Arrays.asList(
new byte[] {originalReq.version},
BytesUtils.shortToBytes(originalReq.type),
originalReq.getBody()));
+
+ ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
for (final PipeCompressor compressor : compressors) {
+ ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+ ReadWriteIOUtils.write(body.length, outputStream);
body = compressor.compress(body);
}
outputStream.write(body);
@@ -84,17 +85,19 @@ public class PipeTransferCompressedReq extends
TPipeTransferReq {
final ByteBuffer compressedBuffer = transferReq.body;
final List<PipeCompressor> compressors = new ArrayList<>();
+ final List<Integer> uncompressedLengths = new ArrayList<>();
final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer);
for (int i = 0; i < compressorsSize; ++i) {
compressors.add(
PipeCompressorFactory.getCompressor(ReadWriteIOUtils.readByte(compressedBuffer)));
+ uncompressedLengths.add(ReadWriteIOUtils.readInt(compressedBuffer));
}
byte[] body = new byte[compressedBuffer.remaining()];
compressedBuffer.get(body);
for (int i = compressors.size() - 1; i >= 0; --i) {
- body = compressors.get(i).decompress(body);
+ body = compressors.get(i).decompress(body, uncompressedLengths.get(i));
}
final ByteBuffer decompressedBuffer = ByteBuffer.wrap(body);
@@ -115,26 +118,27 @@ public class PipeTransferCompressedReq extends
TPipeTransferReq {
// The generated bytes consists of:
// (byte) version
// (2 bytes) type: TRANSFER_COMPRESSED
- // (byte) count of compressors
- // (bytes) 1 byte for each compressor
+ // (byte) count of compressors (n)
+ // (n*3 bytes) for each compressor:
+ // (byte) compressor type
+ // (int) length of uncompressed bytes
// compressed req:
// (byte) version
// (2 bytes) type
// (bytes) body
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ byte[] body = rawReqInBytes;
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
ReadWriteIOUtils.write(PipeRequestType.TRANSFER_COMPRESSED.getType(),
outputStream);
ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
for (final PipeCompressor compressor : compressors) {
ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+ ReadWriteIOUtils.write(body.length, outputStream);
+ body = compressor.compress(body);
}
-
- byte[] compressedReq = rawReqInBytes;
- for (final PipeCompressor compressor : compressors) {
- compressedReq = compressor.compress(compressedReq);
- }
- outputStream.write(compressedReq);
+ outputStream.write(body);
return byteArrayOutputStream.toByteArray();
}