This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dd7dc63  ISSUE #716: PendingAddOp accesses a released ByteBuf
dd7dc63 is described below

commit dd7dc6399504b246c65f7a09657597578b81cb6c
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Nov 11 13:55:57 2017 -0800

    ISSUE #716: PendingAddOp accesses a released ByteBuf
    
    Descriptions of the changes in this PR:
    
    - release `toSend` bytebuf only when a PendingAddOp is recycled.
    - fix MockBookKeeperTestCase to run callbacks in the same executor thread.
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
    
    This closes #717 from sijie/issue_176, closes #716
---
 .../org/apache/bookkeeper/client/PendingAddOp.java |   4 +-
 .../bookkeeper/client/MockBookKeeperTestCase.java  | 123 ++++++++++-----------
 2 files changed, 62 insertions(+), 65 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index f4d05b7..5bc2b54 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -344,9 +344,6 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
             timeout.cancel();
         }
 
-
-        ReferenceCountUtil.release(toSend);
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), 
entryId, rc);
         }
@@ -418,6 +415,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
     private void recycle() {
         entryId = LedgerHandle.INVALID_ENTRY_ID;
         currentLedgerLength = -1;
+        ReferenceCountUtil.release(toSend);
         payload = null;
         toSend = null;
         cb = null;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 6ef8901..338c2b7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,6 +17,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -24,7 +28,6 @@ import com.google.common.base.Optional;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -34,6 +37,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
@@ -48,12 +53,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import org.mockito.Mockito;
-import static org.mockito.Mockito.doAnswer;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -237,13 +237,12 @@ public abstract class MockBookKeeperTestCase {
         return mockLedgerMetadataRegistry.get(ledgerId);
     }
 
+    @SuppressWarnings("unchecked")
     private void setupReadLedgerMetadata() {
-        doAnswer((Answer<Void>) new Answer<Void>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                Long ledgerId = (Long) args[0];
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            Long ledgerId = (Long) args[0];
+            executor.submitOrdered(ledgerId, () -> {
                 BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[1];
                 LedgerMetadata ledgerMetadata = 
mockLedgerMetadataRegistry.get(ledgerId);
                 if (ledgerMetadata == null) {
@@ -251,26 +250,25 @@ public abstract class MockBookKeeperTestCase {
                 } else {
                     cb.operationComplete(BKException.Code.OK, new 
LedgerMetadata(ledgerMetadata));
                 }
-                return null;
-            }
+            });
+            return null;
         }).when(ledgerManager).readLedgerMetadata(anyLong(), any());
     }
 
+    @SuppressWarnings("unchecked")
     private void setupRemoveLedgerMetadata() {
-        doAnswer((Answer<Void>) new Answer<Void>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                Long ledgerId = (Long) args[0];
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            Long ledgerId = (Long) args[0];
+            executor.submitOrdered(ledgerId, () -> {
                 BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
                 if (mockLedgerMetadataRegistry.remove(ledgerId) != null) {
                     cb.operationComplete(BKException.Code.OK, null);
                 } else {
                     
cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
                 }
-                return null;
-            }
+            });
+            return null;
         }).when(ledgerManager).removeLedgerMetadata(anyLong(), any(), any());
     }
 
@@ -284,62 +282,58 @@ public abstract class MockBookKeeperTestCase {
         }).when(ledgerManager).registerLedgerMetadataListener(anyLong(), 
any());
     }
 
+    @SuppressWarnings("unchecked")
     private void setupLedgerIdGenerator() {
-        Mockito.doAnswer((Answer<Void>) new Answer<Void>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[0];
-                cb.operationComplete(BKException.Code.OK, 
mockNextLedgerId.getAndIncrement());
-                return null;
-            }
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[0];
+            cb.operationComplete(Code.OK, mockNextLedgerId.getAndIncrement());
+            return null;
         }).when(ledgerIdGenerator).generateLedgerId(any());
     }
 
+    @SuppressWarnings("unchecked")
     private void setupCreateLedgerMetadata() {
-        doAnswer((Answer<Void>) new Answer<Void>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
-                Long ledgerId = (Long) args[0];
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+            Long ledgerId = (Long) args[0];
+            executor.submitOrdered(ledgerId, () -> {
                 LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1];
                 mockLedgerMetadataRegistry.put(ledgerId, new 
LedgerMetadata(ledgerMetadata));
                 cb.operationComplete(BKException.Code.OK, null);
-                return null;
-            }
+            });
+            return null;
         }).when(ledgerManager).createLedgerMetadata(anyLong(), any(), any());
     }
 
+    @SuppressWarnings("unchecked")
     private void setupWriteLedgerMetadata() {
-        doAnswer((Answer<Void>) new Answer<Void>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                Object[] args = invocation.getArguments();
-                Long ledgerId = (Long) args[0];
-                LedgerMetadata metadata = (LedgerMetadata) args[1];
-                BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            Long ledgerId = (Long) args[0];
+            LedgerMetadata metadata = (LedgerMetadata) args[1];
+            BookkeeperInternalCallbacks.GenericCallback cb = 
(BookkeeperInternalCallbacks.GenericCallback) args[2];
+            executor.submitOrdered(ledgerId, () -> {
                 mockLedgerMetadataRegistry.put(ledgerId, new 
LedgerMetadata(metadata));
                 cb.operationComplete(BKException.Code.OK, null);
-                return null;
-            }
+            });
+            return null;
         }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), any());
     }
 
+    @SuppressWarnings("unchecked")
     protected void setupBookieClientReadEntry() {
-        doAnswer((Answer) (InvocationOnMock invokation) -> {
+        doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookkeeperInternalCallbacks.ReadEntryCallback callback = 
(BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) 
args[0];
             long ledgerId = (Long) args[1];
             long entryId = (Long) args[3];
 
-            DigestManager macManager = new CRC32DigestManager(ledgerId);
-            fencedLedgers.add(ledgerId);
-            submit(() -> {
+            executor.submitOrdered(ledgerId, () -> {
+                DigestManager macManager = new CRC32DigestManager(ledgerId);
+                fencedLedgers.add(ledgerId);
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, 
bookieSocketAddress, entryId);
                 if (mockEntry != null) {
                     LOG.info("readEntryAndFenceLedger - found mock entry {}@{} 
at {}", ledgerId, entryId, bookieSocketAddress);
@@ -356,16 +350,15 @@ public abstract class MockBookKeeperTestCase {
         }).when(bookieClient).readEntryAndFenceLedger(any(), anyLong(), any(), 
anyLong(),
             any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
 
-        doAnswer((Answer) (InvocationOnMock invokation) -> {
+        doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) 
args[0];
             long ledgerId = (Long) args[1];
             long entryId = (Long) args[2];
             BookkeeperInternalCallbacks.ReadEntryCallback callback = 
(BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
 
-            DigestManager macManager = new CRC32DigestManager(ledgerId);
-
-            submit(() -> {
+            executor.submitOrdered(ledgerId, () -> {
+                DigestManager macManager = new CRC32DigestManager(ledgerId);
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, 
bookieSocketAddress, entryId);
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", 
ledgerId, entryId, bookieSocketAddress);
@@ -395,8 +388,9 @@ public abstract class MockBookKeeperTestCase {
         return entry;
     }
 
+    @SuppressWarnings("unchecked")
     protected void setupBookieClientAddEntry() {
-        doAnswer((Answer) (InvocationOnMock invokation) -> {
+        doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookkeeperInternalCallbacks.WriteCallback callback = 
(BookkeeperInternalCallbacks.WriteCallback) args[5];
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) 
args[0];
@@ -405,9 +399,14 @@ public abstract class MockBookKeeperTestCase {
             ByteBuf toSend = (ByteBuf) args[4];
             Object ctx = args[6];
 
-            byte[] entry = extractEntryPayload(ledgerId, entryId, toSend);
-
-            submit(() -> {
+            executor.submitOrdered(ledgerId, () -> {
+                byte[] entry;
+                try {
+                    entry = extractEntryPayload(ledgerId, entryId, toSend);
+                } catch (BKDigestMatchException e) {
+                    callback.writeComplete(Code.DigestMatchException, 
ledgerId, entryId, bookieSocketAddress, ctx);
+                    return;
+                }
                 boolean fenced = fencedLedgers.contains(ledgerId);
                 if (fenced) {
                     
callback.writeComplete(BKException.Code.LedgerFencedException,

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to