This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af9ae9b0624edc9465bd7665c19f2fe5516eb792
Author: Rui Fu <[email protected]>
AuthorDate: Thu Aug 26 16:10:43 2021 +0800

    [package management service] fix `wrappedBuffer` always using the same 
block of memory  (#11782)
    
    (cherry picked from commit 12aef52f47eb497165df0cd19e4052b1bc9f9768)
---
 .../storage/bookkeeper/DLOutputStream.java           |  4 ++--
 .../bookkeeper/BookKeeperPackagesStorageTest.java    | 20 +++++++++++++++++++-
 .../storage/bookkeeper/DLOutputStreamTest.java       | 13 +++++++++++++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
index a951cd4..7329aa9 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -58,8 +58,8 @@ class DLOutputStream {
             try {
                 int read = 0;
                 while ((read = inputStream.read(readBuffer)) != -1) {
-                    log.info("write something into the ledgers " + offset);
-                    ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, 
read);
+                    log.info("write something into the ledgers offset: {}, 
length: {}", offset, read);
+                    ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, 
read);
                     offset += writeBuf.readableBytes();
                     LogRecord record = new LogRecord(offset, writeBuf);
                     records.add(record);
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 1e6df3f..339bafc 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.pulsar.packages.management.core.PackagesStorage;
@@ -41,7 +42,7 @@ public class BookKeeperPackagesStorageTest extends 
BookKeeperClusterTestCase {
     private PackagesStorage storage;
 
     public BookKeeperPackagesStorageTest() {
-        super(1);
+        super(2);
     }
 
     @BeforeMethod()
@@ -90,6 +91,23 @@ public class BookKeeperPackagesStorageTest extends 
BookKeeperClusterTestCase {
     }
 
     @Test(timeOut = 60000)
+    public void testReadWriteLargeDataOperations() throws ExecutionException, 
InterruptedException {
+        byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096);
+        ByteArrayInputStream testDataStream = new ByteArrayInputStream(data);
+        String testPath = "test-large-read-write";
+
+        // write some data to the dlog
+        storage.writeAsync(testPath, testDataStream).get();
+
+        // read the data from the dlog
+        ByteArrayOutputStream readData = new ByteArrayOutputStream();
+        storage.readAsync(testPath, readData).get();
+        byte[] readResult = readData.toByteArray();
+
+        assertEquals(data, readResult);
+    }
+
+    @Test(timeOut = 60000)
     public void testReadNonExistentData() {
         String testPath = "non-existent-path";
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
index 38578b0..c815a5d 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -97,6 +97,19 @@ public class DLOutputStreamTest {
     }
 
     @Test
+    public void writeLongBytesArrayData() throws ExecutionException, 
InterruptedException {
+        byte[] data = new byte[8192 * 3 + 4096];
+        DLOutputStream.openWriterAsync(dlm)
+                .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
+                        .thenCompose(DLOutputStream::closeAsync)).get();
+
+        verify(writer, times(1)).writeBulk(any(List.class));
+        verify(writer, times(1)).markEndOfStream();
+        verify(writer, times(1)).asyncClose();
+        verify(dlm, times(1)).asyncClose();
+    }
+
+    @Test
     public void openAsyncLogWriterFailed() {
         when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new 
Exception("Open writer was failed")));
 

Reply via email to