This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new f70e52a700d [fix][broker] Fix broker OOM when upload a large package.
(#22989)
f70e52a700d is described below
commit f70e52a700d3348dbdb2615495a2beb16c790f23
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Jul 2 08:46:56 2024 +0800
[fix][broker] Fix broker OOM when upload a large package. (#22989)
(cherry picked from commit da2a1910a32e622ea609ff7b9e91711ecaf36de6)
---
.../storage/bookkeeper/DLOutputStream.java | 53 ++++++++++------------
.../storage/bookkeeper/DLOutputStreamTest.java | 14 +++---
2 files changed, 31 insertions(+), 36 deletions(-)
diff --git
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
index 222987aa49d..67345ebd47e 100644
---
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
+++
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -22,8 +22,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.LogRecord;
@@ -38,6 +36,7 @@ class DLOutputStream {
private final DistributedLogManager distributedLogManager;
private final AsyncLogWriter writer;
+ private final byte[] readBuffer = new byte[8192];
private long offset = 0L;
private DLOutputStream(DistributedLogManager distributedLogManager,
AsyncLogWriter writer) {
@@ -50,42 +49,38 @@ class DLOutputStream {
return distributedLogManager.openAsyncLogWriter().thenApply(w -> new
DLOutputStream(distributedLogManager, w));
}
- private CompletableFuture<List<LogRecord>> getRecords(InputStream
inputStream) {
- CompletableFuture<List<LogRecord>> future = new CompletableFuture<>();
- CompletableFuture.runAsync(() -> {
- byte[] readBuffer = new byte[8192];
- List<LogRecord> records = new ArrayList<>();
- try {
- int read = 0;
- while ((read = inputStream.read(readBuffer)) != -1) {
- log.info("write something into the ledgers offset: {},
length: {}", offset, read);
- ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0,
read);
- offset += writeBuf.readableBytes();
- LogRecord record = new LogRecord(offset, writeBuf);
- records.add(record);
- }
- future.complete(records);
- } catch (IOException e) {
- log.error("Failed to get all records from the input stream",
e);
- future.completeExceptionally(e);
+ private void writeAsyncHelper(InputStream is,
CompletableFuture<DLOutputStream> result) {
+ try {
+ int read = is.read(readBuffer);
+ if (read != -1) {
+ log.info("write something into the ledgers offset: {}, length:
{}", offset, read);
+ final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0,
read);
+ offset += writeBuf.readableBytes();
+ final LogRecord record = new LogRecord(offset, writeBuf);
+ writer.write(record).thenAccept(v -> writeAsyncHelper(is,
result))
+ .exceptionally(e -> {
+ result.completeExceptionally(e);
+ return null;
+ });
+ } else {
+ result.complete(this);
}
- });
- return future;
+ } catch (IOException e) {
+ log.error("Failed to get all records from the input stream", e);
+ result.completeExceptionally(e);
+ }
}
/**
* Write all input stream data to the distribute log.
*
* @param inputStream the data we need to write
- * @return
+ * @return CompletableFuture<DLOutputStream>
*/
CompletableFuture<DLOutputStream> writeAsync(InputStream inputStream) {
- return getRecords(inputStream)
- .thenCompose(this::writeAsync);
- }
-
- private CompletableFuture<DLOutputStream> writeAsync(List<LogRecord>
records) {
- return writer.writeBulk(records).thenApply(ignore -> this);
+ CompletableFuture<DLOutputStream> result = new CompletableFuture<>();
+ writeAsyncHelper(inputStream, result);
+ return result;
}
/**
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
index 63fcf5e46eb..b55e0e0d34a 100644
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
+++
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -21,17 +21,18 @@ package
org.apache.pulsar.packages.management.storage.bookkeeper;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.anyList;
@@ -53,9 +54,8 @@ public class DLOutputStreamTest {
when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
when(writer.markEndOfStream()).thenReturn(CompletableFuture.completedFuture(null));
when(writer.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
- when(writer.writeBulk(anyList()))
- .thenReturn(CompletableFuture.completedFuture(
-
Collections.singletonList(CompletableFuture.completedFuture(DLSN.InitialDLSN))));
+ when(writer.write(any(LogRecord.class)))
+
.thenReturn(CompletableFuture.completedFuture(DLSN.InitialDLSN));
}
@AfterMethod(alwaysRun = true)
@@ -75,7 +75,7 @@ public class DLOutputStreamTest {
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();
- verify(writer, times(1)).writeBulk(anyList());
+ verify(writer, times(1)).write(any(LogRecord.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();
@@ -91,7 +91,7 @@ public class DLOutputStreamTest {
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();
- verify(writer, times(1)).writeBulk(anyList());
+ verify(writer, times(1)).write(any(LogRecord.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();
@@ -104,7 +104,7 @@ public class DLOutputStreamTest {
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();
- verify(writer, times(1)).writeBulk(anyList());
+ verify(writer, times(4)).write(any(LogRecord.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();