This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7c3f8 Fix the potential race condition in the BlobStore readhandler
(#12123)
3d7c3f8 is described below
commit 3d7c3f88af65b4cdf3e46723f468f038593de206
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Oct 12 08:48:24 2021 +0800
Fix the potential race condition in the BlobStore readhandler (#12123)
* Fix the potential race condition in the BlobStore readhandler
---
*Motivation*
We found the BlobStoreBackedReadHandler enter an infinite loop
when reading a offload ledger.
We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.
The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that
buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```
So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.
*Modifications*
- Add a state check before reading entries.
- exit loop when the entryID is bigger than the lastEntryID
---
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 111 +++++++++++++--------
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 18 +++-
tiered-storage/jcloud/src/test/resources/ledger-1 | Bin 0 -> 205 bytes
.../jcloud/src/test/resources/ledger-1-index | Bin 0 -> 207 bytes
tiered-storage/jcloud/src/test/resources/ledger-2 | Bin 0 -> 284 bytes
.../jcloud/src/test/resources/ledger-2-index | Bin 0 -> 208 bytes
6 files changed, 89 insertions(+), 40 deletions(-)
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 45b0942..2bf380d 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -54,6 +55,13 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
private final DataInputStream dataStream;
private final ExecutorService executor;
+ enum State {
+ Opened,
+ Closed
+ }
+
+ private State state = null;
+
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock
index,
BackedInputStream inputStream,
ExecutorService executor) {
@@ -62,6 +70,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
+ state = State.Opened;
}
@Override
@@ -81,6 +90,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
try {
index.close();
inputStream.close();
+ state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
@@ -94,6 +104,8 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry,
lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
+ List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+ try {
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
@@ -101,49 +113,64 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
return;
}
long entriesToRead = (lastEntry - firstEntry) + 1;
- List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
- try {
- while (entriesToRead > 0) {
- int length = dataStream.readInt();
- if (length < 0) { // hit padding or new block
-
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
- continue;
- }
- long entryId = dataStream.readLong();
-
- if (entryId == nextExpectedId) {
- ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
- entries.add(LedgerEntryImpl.create(ledgerId,
entryId, length, buf));
- int toWrite = length;
- while (toWrite > 0) {
- toWrite -= buf.writeBytes(dataStream, toWrite);
- }
- entriesToRead--;
- nextExpectedId++;
- } else if (entryId > nextExpectedId) {
-
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
- continue;
- } else if (entryId < nextExpectedId
- &&
!index.getIndexEntryForEntry(nextExpectedId).equals(
- index.getIndexEntryForEntry(entryId))) {
-
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
- continue;
- } else if (entryId > lastEntry) {
- log.info("Expected to read {}, but read {}, which
is greater than last entry {}",
- nextExpectedId, entryId, lastEntry);
- throw new
BKException.BKUnexpectedConditionException();
- } else {
- long ignored = inputStream.skip(length);
+
+ // seek the position to the first entry position, otherwise we
will get the unexpected entry ID when doing
+ // the first read, that would cause read an unexpected entry
id which is out of range between firstEntry
+ // and lastEntry
+ // for example, when we get 1-10 entries at first, then the
next request is get 2-9, the following code
+ // will read the entry id from the stream and that is not the
correct entry id, so it will seek to the
+ // correct position then read the stream as normal. But the
entry id may exceed the last entry id, that
+ // will cause we are hardly to know the edge of the request
range.
+
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+
+ while (entriesToRead > 0) {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID:
{}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ int length = dataStream.readInt();
+ if (length < 0) { // hit padding or new block
+
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ continue;
+ }
+ long entryId = dataStream.readLong();
+
+ if (entryId == nextExpectedId) {
+ ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+ entries.add(LedgerEntryImpl.create(ledgerId, entryId,
length, buf));
+ int toWrite = length;
+ while (toWrite > 0) {
+ toWrite -= buf.writeBytes(dataStream, toWrite);
}
+ entriesToRead--;
+ nextExpectedId++;
+ } else if (entryId > nextExpectedId && entryId <
lastEntry) {
+ log.warn("The read entry {} is not the expected entry
{} but in the range of {} - {},"
+ + " seeking to the right position", entryId,
nextExpectedId, nextExpectedId, lastEntry);
+
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ continue;
+ } else if (entryId < nextExpectedId
+ &&
!index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId)))
{
+ log.warn("Read an unexpected entry id {} which is
smaller than the next expected entry id {}"
+ + ", seeking to the right position", entries,
nextExpectedId);
+
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+ continue;
+ } else if (entryId > lastEntry) {
+ log.info("Expected to read {}, but read {}, which is
greater than last entry {}",
+ nextExpectedId, entryId, lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ } else {
+ long ignore = inputStream.skip(length);
}
-
- promise.complete(LedgerEntriesImpl.create(entries));
- } catch (Throwable t) {
- promise.completeExceptionally(t);
- entries.forEach(LedgerEntry::close);
}
- });
+
+ promise.complete(LedgerEntriesImpl.create(entries));
+ } catch (Throwable t) {
+ promise.completeExceptionally(t);
+ entries.forEach(LedgerEntry::close);
+ }
+ });
return promise;
}
@@ -203,6 +230,12 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
versionCheck,
index.getDataObjectLength(),
readBufferSize);
+
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream,
executor);
}
+
+ // for testing
+ State getState() {
+ return this.state;
+ }
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 299baff..90d8b11 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -141,6 +141,22 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
}
}
+ @Test(timeOut = 60000)
+ public void testReadHandlerState() throws Exception {
+ ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
+ LedgerOffloader offloader = getOffloader();
+
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+ BlobStoreBackedReadHandleImpl toTest = (BlobStoreBackedReadHandleImpl)
offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+ Assert.assertEquals(toTest.getLastAddConfirmed(),
toWrite.getLastAddConfirmed());
+ Assert.assertEquals(toTest.getState(),
BlobStoreBackedReadHandleImpl.State.Opened);
+ toTest.read(0, 1);
+ toTest.close();
+ Assert.assertEquals(toTest.getState(),
BlobStoreBackedReadHandleImpl.State.Closed);
+ }
+
@Test
public void testOffloadFailInitDataBlockUpload() throws Exception {
ReadHandle readHandle = buildReadHandle();
@@ -461,4 +477,4 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
Assert.assertTrue(e.getCause().getMessage().contains("Invalid
object version"));
}
}
-}
\ No newline at end of file
+}
diff --git a/tiered-storage/jcloud/src/test/resources/ledger-1
b/tiered-storage/jcloud/src/test/resources/ledger-1
new file mode 100644
index 0000000..e1e8d3e
Binary files /dev/null and b/tiered-storage/jcloud/src/test/resources/ledger-1
differ
diff --git a/tiered-storage/jcloud/src/test/resources/ledger-1-index
b/tiered-storage/jcloud/src/test/resources/ledger-1-index
new file mode 100644
index 0000000..f560e37
Binary files /dev/null and
b/tiered-storage/jcloud/src/test/resources/ledger-1-index differ
diff --git a/tiered-storage/jcloud/src/test/resources/ledger-2
b/tiered-storage/jcloud/src/test/resources/ledger-2
new file mode 100644
index 0000000..81c3680
Binary files /dev/null and b/tiered-storage/jcloud/src/test/resources/ledger-2
differ
diff --git a/tiered-storage/jcloud/src/test/resources/ledger-2-index
b/tiered-storage/jcloud/src/test/resources/ledger-2-index
new file mode 100644
index 0000000..cda3ced
Binary files /dev/null and
b/tiered-storage/jcloud/src/test/resources/ledger-2-index differ