This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 51ffb45e0bb [fix][broker] Increase readBuffer size for
bookkeeper.DLOutputStream (#23548)
51ffb45e0bb is described below
commit 51ffb45e0bb80e87112c710a29955eaba0bdf989
Author: jiangpengcheng <[email protected]>
AuthorDate: Tue Nov 5 08:19:22 2024 +0800
[fix][broker] Increase readBuffer size for bookkeeper.DLOutputStream
(#23548)
(cherry picked from commit 7a4788895e31dcd794fcb89b3af2bc36fa221343)
---
.../management/storage/bookkeeper/DLOutputStream.java | 17 +++++++++++++----
.../storage/bookkeeper/DLOutputStreamTest.java | 2 +-
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
index 67345ebd47e..f446961c1d8 100644
---
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
+++
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -36,7 +36,16 @@ class DLOutputStream {
private final DistributedLogManager distributedLogManager;
private final AsyncLogWriter writer;
- private final byte[] readBuffer = new byte[8192];
+ /*
+ * The LogRecord structure is:
+ * -------------------
+ * Bytes 0 - 7 : Metadata (Long)
+ * Bytes 8 - 15 : TxId (Long)
+ * Bytes 16 - 19 : Payload length (Integer)
+ * Bytes 20 - 20+payload.length-1 : Payload (Byte[])
+ * So the max buffer size should be LogRecord.MAX_LOGRECORD_SIZE - 2 *
(Long.SIZE / 8) - Integer.SIZE / 8
+ */
+ private final byte[] readBuffer = new byte[LogRecord.MAX_LOGRECORD_SIZE -
2 * (Long.SIZE / 8) - Integer.SIZE / 8];
private long offset = 0L;
private DLOutputStream(DistributedLogManager distributedLogManager,
AsyncLogWriter writer) {
@@ -51,9 +60,9 @@ class DLOutputStream {
private void writeAsyncHelper(InputStream is,
CompletableFuture<DLOutputStream> result) {
try {
- int read = is.read(readBuffer);
- if (read != -1) {
- log.info("write something into the ledgers offset: {}, length:
{}", offset, read);
+ int read = is.readNBytes(readBuffer, 0, readBuffer.length);
+ if (read > 0) {
+ log.debug("write something into the ledgers offset: {},
length: {}", offset, read);
final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0,
read);
offset += writeBuf.readableBytes();
final LogRecord record = new LogRecord(offset, writeBuf);
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
index b55e0e0d34a..235cb4fefc0 100644
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
+++
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -99,7 +99,7 @@ public class DLOutputStreamTest {
@Test
public void writeLongBytesArrayData() throws ExecutionException,
InterruptedException {
- byte[] data = new byte[8192 * 3 + 4096];
+ byte[] data = new byte[1040364 * 3 + 4096];
DLOutputStream.openWriterAsync(dlm)
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();