This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new f70e52a700d [fix][broker] Fix broker OOM when upload a large package. 
(#22989)
f70e52a700d is described below

commit f70e52a700d3348dbdb2615495a2beb16c790f23
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Jul 2 08:46:56 2024 +0800

    [fix][broker] Fix broker OOM when upload a large package. (#22989)
    
    (cherry picked from commit da2a1910a32e622ea609ff7b9e91711ecaf36de6)
---
 .../storage/bookkeeper/DLOutputStream.java         | 53 ++++++++++------------
 .../storage/bookkeeper/DLOutputStreamTest.java     | 14 +++---
 2 files changed, 31 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
index 222987aa49d..67345ebd47e 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -22,8 +22,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.LogRecord;
@@ -38,6 +36,7 @@ class DLOutputStream {
 
     private final DistributedLogManager distributedLogManager;
     private final AsyncLogWriter writer;
+    private final byte[] readBuffer = new byte[8192];
     private long offset = 0L;
 
     private DLOutputStream(DistributedLogManager distributedLogManager, 
AsyncLogWriter writer) {
@@ -50,42 +49,38 @@ class DLOutputStream {
         return distributedLogManager.openAsyncLogWriter().thenApply(w -> new 
DLOutputStream(distributedLogManager, w));
     }
 
-    private CompletableFuture<List<LogRecord>> getRecords(InputStream 
inputStream) {
-        CompletableFuture<List<LogRecord>> future = new CompletableFuture<>();
-        CompletableFuture.runAsync(() -> {
-            byte[] readBuffer = new byte[8192];
-            List<LogRecord> records = new ArrayList<>();
-            try {
-                int read = 0;
-                while ((read = inputStream.read(readBuffer)) != -1) {
-                    log.info("write something into the ledgers offset: {}, 
length: {}", offset, read);
-                    ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, 
read);
-                    offset += writeBuf.readableBytes();
-                    LogRecord record = new LogRecord(offset, writeBuf);
-                    records.add(record);
-                }
-                future.complete(records);
-            } catch (IOException e) {
-                log.error("Failed to get all records from the input stream", 
e);
-                future.completeExceptionally(e);
+    private void writeAsyncHelper(InputStream is, 
CompletableFuture<DLOutputStream> result) {
+        try {
+            int read = is.read(readBuffer);
+            if (read != -1) {
+                log.info("write something into the ledgers offset: {}, length: 
{}", offset, read);
+                final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, 
read);
+                offset += writeBuf.readableBytes();
+                final LogRecord record = new LogRecord(offset, writeBuf);
+                writer.write(record).thenAccept(v -> writeAsyncHelper(is, 
result))
+                        .exceptionally(e -> {
+                            result.completeExceptionally(e);
+                            return null;
+                        });
+            } else {
+                result.complete(this);
             }
-        });
-        return future;
+        } catch (IOException e) {
+            log.error("Failed to get all records from the input stream", e);
+            result.completeExceptionally(e);
+        }
     }
 
     /**
      * Write all input stream data to the distribute log.
      *
      * @param inputStream the data we need to write
-     * @return
+     * @return CompletableFuture<DLOutputStream>
      */
     CompletableFuture<DLOutputStream> writeAsync(InputStream inputStream) {
-        return getRecords(inputStream)
-            .thenCompose(this::writeAsync);
-    }
-
-    private CompletableFuture<DLOutputStream> writeAsync(List<LogRecord> 
records) {
-        return writer.writeBulk(records).thenApply(ignore -> this);
+        CompletableFuture<DLOutputStream> result = new CompletableFuture<>();
+        writeAsyncHelper(inputStream, result);
+        return result;
     }
 
     /**
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
index 63fcf5e46eb..b55e0e0d34a 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -21,17 +21,18 @@ package 
org.apache.pulsar.packages.management.storage.bookkeeper;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.anyList;
@@ -53,9 +54,8 @@ public class DLOutputStreamTest {
         
when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
         
when(writer.markEndOfStream()).thenReturn(CompletableFuture.completedFuture(null));
         
when(writer.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
-        when(writer.writeBulk(anyList()))
-                .thenReturn(CompletableFuture.completedFuture(
-                        
Collections.singletonList(CompletableFuture.completedFuture(DLSN.InitialDLSN))));
+        when(writer.write(any(LogRecord.class)))
+                
.thenReturn(CompletableFuture.completedFuture(DLSN.InitialDLSN));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -75,7 +75,7 @@ public class DLOutputStreamTest {
             .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
                 .thenCompose(DLOutputStream::closeAsync)).get();
 
-        verify(writer, times(1)).writeBulk(anyList());
+        verify(writer, times(1)).write(any(LogRecord.class));
         verify(writer, times(1)).markEndOfStream();
         verify(writer, times(1)).asyncClose();
         verify(dlm, times(1)).asyncClose();
@@ -91,7 +91,7 @@ public class DLOutputStreamTest {
             .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
                 .thenCompose(DLOutputStream::closeAsync)).get();
 
-        verify(writer, times(1)).writeBulk(anyList());
+        verify(writer, times(1)).write(any(LogRecord.class));
         verify(writer, times(1)).markEndOfStream();
         verify(writer, times(1)).asyncClose();
         verify(dlm, times(1)).asyncClose();
@@ -104,7 +104,7 @@ public class DLOutputStreamTest {
                 .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
                         .thenCompose(DLOutputStream::closeAsync)).get();
 
-        verify(writer, times(1)).writeBulk(anyList());
+        verify(writer, times(4)).write(any(LogRecord.class));
         verify(writer, times(1)).markEndOfStream();
         verify(writer, times(1)).asyncClose();
         verify(dlm, times(1)).asyncClose();

Reply via email to