This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new dd7dc63 ISSUE #716: PendingAddOp accesses a released ByteBuf
dd7dc63 is described below
commit dd7dc6399504b246c65f7a09657597578b81cb6c
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Nov 11 13:55:57 2017 -0800
ISSUE #716: PendingAddOp accesses a released ByteBuf
Descriptions of the changes in this PR:
- release `toSend` bytebuf only when a PendingAddOp is recycled.
- fix MockBookKeeperTestCase to run callbacks in the same executor thread.
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #717 from sijie/issue_176, closes #716
---
.../org/apache/bookkeeper/client/PendingAddOp.java | 4 +-
.../bookkeeper/client/MockBookKeeperTestCase.java | 123 ++++++++++-----------
2 files changed, 62 insertions(+), 65 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index f4d05b7..5bc2b54 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -344,9 +344,6 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
timeout.cancel();
}
-
- ReferenceCountUtil.release(toSend);
-
if (LOG.isDebugEnabled()) {
LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(),
entryId, rc);
}
@@ -418,6 +415,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
private void recycle() {
entryId = LedgerHandle.INVALID_ENTRY_ID;
currentLedgerLength = -1;
+ ReferenceCountUtil.release(toSend);
payload = null;
toSend = null;
cb = null;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 6ef8901..338c2b7 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,6 +17,10 @@
*/
package org.apache.bookkeeper.client;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -24,7 +28,6 @@ import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -34,6 +37,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
@@ -48,12 +53,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
import org.junit.Before;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
import org.mockito.Mockito;
-import static org.mockito.Mockito.doAnswer;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -237,13 +237,12 @@ public abstract class MockBookKeeperTestCase {
return mockLedgerMetadataRegistry.get(ledgerId);
}
+ @SuppressWarnings("unchecked")
private void setupReadLedgerMetadata() {
- doAnswer((Answer<Void>) new Answer<Void>() {
- @Override
- @SuppressWarnings("unchecked")
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- Long ledgerId = (Long) args[0];
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Long ledgerId = (Long) args[0];
+ executor.submitOrdered(ledgerId, () -> {
BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[1];
LedgerMetadata ledgerMetadata =
mockLedgerMetadataRegistry.get(ledgerId);
if (ledgerMetadata == null) {
@@ -251,26 +250,25 @@ public abstract class MockBookKeeperTestCase {
} else {
cb.operationComplete(BKException.Code.OK, new
LedgerMetadata(ledgerMetadata));
}
- return null;
- }
+ });
+ return null;
}).when(ledgerManager).readLedgerMetadata(anyLong(), any());
}
+ @SuppressWarnings("unchecked")
private void setupRemoveLedgerMetadata() {
- doAnswer((Answer<Void>) new Answer<Void>() {
- @Override
- @SuppressWarnings("unchecked")
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- Long ledgerId = (Long) args[0];
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Long ledgerId = (Long) args[0];
+ executor.submitOrdered(ledgerId, () -> {
BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
if (mockLedgerMetadataRegistry.remove(ledgerId) != null) {
cb.operationComplete(BKException.Code.OK, null);
} else {
cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
}
- return null;
- }
+ });
+ return null;
}).when(ledgerManager).removeLedgerMetadata(anyLong(), any(), any());
}
@@ -284,62 +282,58 @@ public abstract class MockBookKeeperTestCase {
}).when(ledgerManager).registerLedgerMetadataListener(anyLong(),
any());
}
+ @SuppressWarnings("unchecked")
private void setupLedgerIdGenerator() {
- Mockito.doAnswer((Answer<Void>) new Answer<Void>() {
- @Override
- @SuppressWarnings("unchecked")
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[0];
- cb.operationComplete(BKException.Code.OK,
mockNextLedgerId.getAndIncrement());
- return null;
- }
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[0];
+ cb.operationComplete(Code.OK, mockNextLedgerId.getAndIncrement());
+ return null;
}).when(ledgerIdGenerator).generateLedgerId(any());
}
+ @SuppressWarnings("unchecked")
private void setupCreateLedgerMetadata() {
- doAnswer((Answer<Void>) new Answer<Void>() {
- @Override
- @SuppressWarnings("unchecked")
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
- Long ledgerId = (Long) args[0];
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+ Long ledgerId = (Long) args[0];
+ executor.submitOrdered(ledgerId, () -> {
LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1];
mockLedgerMetadataRegistry.put(ledgerId, new
LedgerMetadata(ledgerMetadata));
cb.operationComplete(BKException.Code.OK, null);
- return null;
- }
+ });
+ return null;
}).when(ledgerManager).createLedgerMetadata(anyLong(), any(), any());
}
+ @SuppressWarnings("unchecked")
private void setupWriteLedgerMetadata() {
- doAnswer((Answer<Void>) new Answer<Void>() {
- @Override
- @SuppressWarnings("unchecked")
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- Long ledgerId = (Long) args[0];
- LedgerMetadata metadata = (LedgerMetadata) args[1];
- BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+ doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Long ledgerId = (Long) args[0];
+ LedgerMetadata metadata = (LedgerMetadata) args[1];
+ BookkeeperInternalCallbacks.GenericCallback cb =
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+ executor.submitOrdered(ledgerId, () -> {
mockLedgerMetadataRegistry.put(ledgerId, new
LedgerMetadata(metadata));
cb.operationComplete(BKException.Code.OK, null);
- return null;
- }
+ });
+ return null;
}).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), any());
}
+ @SuppressWarnings("unchecked")
protected void setupBookieClientReadEntry() {
- doAnswer((Answer) (InvocationOnMock invokation) -> {
+ doAnswer(invokation -> {
Object[] args = invokation.getArguments();
BookkeeperInternalCallbacks.ReadEntryCallback callback =
(BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
BookieSocketAddress bookieSocketAddress = (BookieSocketAddress)
args[0];
long ledgerId = (Long) args[1];
long entryId = (Long) args[3];
- DigestManager macManager = new CRC32DigestManager(ledgerId);
- fencedLedgers.add(ledgerId);
- submit(() -> {
+ executor.submitOrdered(ledgerId, () -> {
+ DigestManager macManager = new CRC32DigestManager(ledgerId);
+ fencedLedgers.add(ledgerId);
MockEntry mockEntry = getMockLedgerEntry(ledgerId,
bookieSocketAddress, entryId);
if (mockEntry != null) {
LOG.info("readEntryAndFenceLedger - found mock entry {}@{}
at {}", ledgerId, entryId, bookieSocketAddress);
@@ -356,16 +350,15 @@ public abstract class MockBookKeeperTestCase {
}).when(bookieClient).readEntryAndFenceLedger(any(), anyLong(), any(),
anyLong(),
any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
- doAnswer((Answer) (InvocationOnMock invokation) -> {
+ doAnswer(invokation -> {
Object[] args = invokation.getArguments();
BookieSocketAddress bookieSocketAddress = (BookieSocketAddress)
args[0];
long ledgerId = (Long) args[1];
long entryId = (Long) args[2];
BookkeeperInternalCallbacks.ReadEntryCallback callback =
(BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
- DigestManager macManager = new CRC32DigestManager(ledgerId);
-
- submit(() -> {
+ executor.submitOrdered(ledgerId, () -> {
+ DigestManager macManager = new CRC32DigestManager(ledgerId);
MockEntry mockEntry = getMockLedgerEntry(ledgerId,
bookieSocketAddress, entryId);
if (mockEntry != null) {
LOG.info("readEntry - found mock entry {}@{} at {}",
ledgerId, entryId, bookieSocketAddress);
@@ -395,8 +388,9 @@ public abstract class MockBookKeeperTestCase {
return entry;
}
+ @SuppressWarnings("unchecked")
protected void setupBookieClientAddEntry() {
- doAnswer((Answer) (InvocationOnMock invokation) -> {
+ doAnswer(invokation -> {
Object[] args = invokation.getArguments();
BookkeeperInternalCallbacks.WriteCallback callback =
(BookkeeperInternalCallbacks.WriteCallback) args[5];
BookieSocketAddress bookieSocketAddress = (BookieSocketAddress)
args[0];
@@ -405,9 +399,14 @@ public abstract class MockBookKeeperTestCase {
ByteBuf toSend = (ByteBuf) args[4];
Object ctx = args[6];
- byte[] entry = extractEntryPayload(ledgerId, entryId, toSend);
-
- submit(() -> {
+ executor.submitOrdered(ledgerId, () -> {
+ byte[] entry;
+ try {
+ entry = extractEntryPayload(ledgerId, entryId, toSend);
+ } catch (BKDigestMatchException e) {
+ callback.writeComplete(Code.DigestMatchException,
ledgerId, entryId, bookieSocketAddress, ctx);
+ return;
+ }
boolean fenced = fencedLedgers.contains(ledgerId);
if (fenced) {
callback.writeComplete(BKException.Code.LedgerFencedException,
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].