This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 12aef52 [package management service] fix `wrappedBuffer` always using
the same block of memory (#11782)
12aef52 is described below
commit 12aef52f47eb497165df0cd19e4052b1bc9f9768
Author: Rui Fu <[email protected]>
AuthorDate: Thu Aug 26 16:10:43 2021 +0800
[package management service] fix `wrappedBuffer` always using the same
block of memory (#11782)
---
.../storage/bookkeeper/DLOutputStream.java | 4 ++--
.../bookkeeper/BookKeeperPackagesStorageTest.java | 20 +++++++++++++++++++-
.../storage/bookkeeper/DLOutputStreamTest.java | 13 +++++++++++++
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
index a951cd4..7329aa9 100644
---
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
+++
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -58,8 +58,8 @@ class DLOutputStream {
try {
int read = 0;
while ((read = inputStream.read(readBuffer)) != -1) {
- log.info("write something into the ledgers " + offset);
- ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0,
read);
+ log.info("write something into the ledgers offset: {},
length: {}", offset, read);
+ ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0,
read);
offset += writeBuf.readableBytes();
LogRecord record = new LogRecord(offset, writeBuf);
records.add(record);
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 1e6df3f..339bafc 100644
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.pulsar.packages.management.core.PackagesStorage;
@@ -41,7 +42,7 @@ public class BookKeeperPackagesStorageTest extends
BookKeeperClusterTestCase {
private PackagesStorage storage;
public BookKeeperPackagesStorageTest() {
- super(1);
+ super(2);
}
@BeforeMethod()
@@ -90,6 +91,23 @@ public class BookKeeperPackagesStorageTest extends
BookKeeperClusterTestCase {
}
@Test(timeOut = 60000)
+ public void testReadWriteLargeDataOperations() throws ExecutionException,
InterruptedException {
+ byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096);
+ ByteArrayInputStream testDataStream = new ByteArrayInputStream(data);
+ String testPath = "test-large-read-write";
+
+ // write some data to the dlog
+ storage.writeAsync(testPath, testDataStream).get();
+
+ // read the data from the dlog
+ ByteArrayOutputStream readData = new ByteArrayOutputStream();
+ storage.readAsync(testPath, readData).get();
+ byte[] readResult = readData.toByteArray();
+
+ assertEquals(data, readResult);
+ }
+
+ @Test(timeOut = 60000)
public void testReadNonExistentData() {
String testPath = "non-existent-path";
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
index 38578b0..c815a5d 100644
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
+++
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -97,6 +97,19 @@ public class DLOutputStreamTest {
}
@Test
+ public void writeLongBytesArrayData() throws ExecutionException,
InterruptedException {
+ byte[] data = new byte[8192 * 3 + 4096];
+ DLOutputStream.openWriterAsync(dlm)
+ .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
+ .thenCompose(DLOutputStream::closeAsync)).get();
+
+ verify(writer, times(1)).writeBulk(any(List.class));
+ verify(writer, times(1)).markEndOfStream();
+ verify(writer, times(1)).asyncClose();
+ verify(dlm, times(1)).asyncClose();
+ }
+
+ @Test
public void openAsyncLogWriterFailed() {
when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new
Exception("Open writer was failed")));