This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af9ae9b0624edc9465bd7665c19f2fe5516eb792 Author: Rui Fu <[email protected]> AuthorDate: Thu Aug 26 16:10:43 2021 +0800 [package management service] fix `wrappedBuffer` always using the same block of memory (#11782) (cherry picked from commit 12aef52f47eb497165df0cd19e4052b1bc9f9768) --- .../storage/bookkeeper/DLOutputStream.java | 4 ++-- .../bookkeeper/BookKeeperPackagesStorageTest.java | 20 +++++++++++++++++++- .../storage/bookkeeper/DLOutputStreamTest.java | 13 +++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java index a951cd4..7329aa9 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java @@ -58,8 +58,8 @@ class DLOutputStream { try { int read = 0; while ((read = inputStream.read(readBuffer)) != -1) { - log.info("write something into the ledgers " + offset); - ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read); + log.info("write something into the ledgers offset: {}, length: {}", offset, read); + ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, read); offset += writeBuf.readableBytes(); LogRecord record = new LogRecord(offset, writeBuf); records.add(record); diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java index 1e6df3f..339bafc 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import org.apache.commons.lang3.RandomUtils; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.exceptions.ZKException; import org.apache.pulsar.packages.management.core.PackagesStorage; @@ -41,7 +42,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase { private PackagesStorage storage; public BookKeeperPackagesStorageTest() { - super(1); + super(2); } @BeforeMethod() @@ -90,6 +91,23 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase { } @Test(timeOut = 60000) + public void testReadWriteLargeDataOperations() throws ExecutionException, InterruptedException { + byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096); + ByteArrayInputStream testDataStream = new ByteArrayInputStream(data); + String testPath = "test-large-read-write"; + + // write some data to the dlog + storage.writeAsync(testPath, testDataStream).get(); + + // read the data from the dlog + ByteArrayOutputStream readData = new ByteArrayOutputStream(); + storage.readAsync(testPath, readData).get(); + byte[] readResult = readData.toByteArray(); + + assertEquals(data, readResult); + } + + @Test(timeOut = 60000) public void testReadNonExistentData() { String testPath = "non-existent-path"; ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java index 38578b0..c815a5d 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java @@ -97,6 +97,19 @@ public class DLOutputStreamTest { } @Test + public void writeLongBytesArrayData() throws ExecutionException, InterruptedException { + byte[] data = new byte[8192 * 3 + 4096]; + DLOutputStream.openWriterAsync(dlm) + .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) + .thenCompose(DLOutputStream::closeAsync)).get(); + + verify(writer, times(1)).writeBulk(any(List.class)); + verify(writer, times(1)).markEndOfStream(); + verify(writer, times(1)).asyncClose(); + verify(dlm, times(1)).asyncClose(); + } + + @Test public void openAsyncLogWriterFailed() { when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new Exception("Open writer was failed")));
