This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a704afd  Support Add-entry timeout at broker to avoid stuck topics 
(#3347)
a704afd is described below

commit a704afd1946c5648f06b12031320359d5aef623d
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Jan 15 07:16:41 2019 -0800

    Support Add-entry timeout at broker to avoid stuck topics (#3347)
    
    ### Motivation
    
    Recently and in past, we have seen few instances where bookie crashes when 
it goes out of memory and kernel panics and broker doesn't complete add-entry 
callback because of that topics get stuck on pending-writes.
    
    ```
    2010-01-01T00:18:19+0000 2010-01-01 00:18:19,594 21907 report_stuck:556 
WARNING persistent://prop1/global/prod-rt-mxt/extractions-partition-0 is stuck 
(broker1.us-west1.com): waiting for write
    2010-01-01T00:18:22+0000 2010-01-01 00:18:22,654 21907 report_stuck:556 
WARNING persistent://prop1/global/mbr-events/27_bf1 is stuck 
(broker1.us-west1.com): waiting for write
    2010-01-01T00:19:10+0000 2010-01-01 00:19:10,790 21907 report_stuck:556 
WARNING persistent://prop1/global/mbr-events/30_bf1 is stuck 
(broker1.us-west1.com): waiting for write
    2010-01-01T00:19:14+0000 2010-01-01 00:19:14,692 21907 report_stuck:556 
WARNING persistent://prop1/global/mbr-events/35_gq2 is stuck 
(broker1.us-west1.com): waiting for write
    2010-01-01T00:19:15+0000 2010-01-01 00:19:15,809 21907 report_stuck:556 
WARNING persistent://prop1/global/jedi-events/batchevents-partition-35 is stuck 
(broker1.us-west1.com): waiting for write
    2010-01-01T00:19:25+0000 2010-01-01 00:19:25,127 21907 report_stuck:556 
WARNING persistent://prop1/global/jedi-events/mailevents-partition-24 is stuck 
(broker1.us-west1.com): waiting for write
    ```
    
    internal-stats
    ```
    {
      "entriesAddedCounter" : 66066317,
      "numberOfEntries" : 14129359,
      "totalSize" : 145505085493,
      "currentLedgerEntries" : 238938,
      "currentLedgerSize" : 3715327400,
      "lastLedgerCreatedTimestamp" : "2018-11-27 00:03:03.125+0000",
      "waitingCursorsCount" : 0,
      "pendingAddEntriesCount" : 252514,
      "lastConfirmedEntry" : "489781963:26121",
      "state" : "ClosingLedger",
      "ledgers" : [ {
        "ledgerId" : 489765238,
        "entries" : 559855,
        "size" : 6928710817
      }, {
    ...
    ```
    
    ### Modifications
    
    Add support of write timeout which can be disable by configuring timeout=0.
    
    ### Result
    
    topic will not stuck in pending write in case add-entry callback doesn't 
completes.
---
 conf/broker.conf                                   |  3 +
 conf/standalone.conf                               |  3 +
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 16 ++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 ++++++++
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 67 +++++++++++++++---
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 82 ++++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 ++-
 .../pulsar/broker/service/BrokerService.java       |  1 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  7 ++
 .../client/impl/BrokerClientIntegrationTest.java   | 76 +++++++++++++++++++-
 site/_data/config/broker.yaml                      |  3 +
 12 files changed, 282 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1cf3a8a..3c2e5ac 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -445,6 +445,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60
 # Read entries timeout when broker tries to read messages from bookkeeper.
 managedLedgerReadEntryTimeoutSeconds=120
 
+# Add entry timeout when broker tries to publish message to bookkeeper (0 to 
disable it).
+managedLedgerAddEntryTimeoutSeconds=120
+
 ### --- Load balancer --- ###
 
 # Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 16d6465..06be447 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -332,6 +332,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60
 # Read entries timeout when broker tries to read messages from bookkeeper.
 managedLedgerReadEntryTimeoutSeconds=120
 
+# Add entry timeout when broker tries to publish message to bookkeeper (0 to 
disable it).
+managedLedgerAddEntryTimeoutSeconds=120
+
 ### --- Load balancer --- ###
 
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 255b534..8f89050 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -58,7 +58,7 @@ public class ManagedLedgerConfig {
     private long offloadAutoTriggerSizeThresholdBytes = -1;
     private long metadataOperationsTimeoutSeconds = 60;
     private long readEntryTimeoutSeconds = 120;
-
+    private long addEntryTimeoutSeconds = 120;
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
@@ -554,4 +554,18 @@ public class ManagedLedgerConfig {
         this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
         return this;
     }
+    
+    public long getAddEntryTimeoutSeconds() {
+        return addEntryTimeoutSeconds;
+    }
+
+    /**
+     * Add-entry timeout after which add-entry callback will be failed if 
add-entry is not succeeded.
+     * 
+     * @param addEntryTimeoutSeconds
+     */
+    public ManagedLedgerConfig setAddEntryTimeoutSeconds(long 
addEntryTimeoutSeconds) {
+        this.addEntryTimeoutSeconds = addEntryTimeoutSeconds;
+        return this;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index dd54630..26b3acc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.time.Clock;
@@ -105,6 +106,7 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -166,6 +168,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
 
     final EntryCache entryCache;
+    
+    private ScheduledFuture<?> timeoutTask;
 
     /**
      * This lock is held while the ledgers list is updated asynchronously on 
the metadata store. Since we use the store
@@ -328,6 +332,28 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 }
             }
         });
+        
+        scheduleTimeoutTask();
+    }
+
+    private void scheduleTimeoutTask() {
+        long timeoutSec = config.getAddEntryTimeoutSeconds();
+        // disable timeout task checker if timeout <= 0
+        if (timeoutSec > 0) {
+            this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() 
-> {
+                OpAddEntry opAddEntry = pendingAddEntries.peek();
+                if (opAddEntry != null) {
+                    boolean isTimedOut = opAddEntry.lastInitTime != -1
+                            && 
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= 
timeoutSec
+                            && opAddEntry.completed == FALSE;
+                    if (isTimedOut) {
+                        log.error("Failed to add entry for ledger {} in 
time-out {} sec",
+                                (opAddEntry.ledger != null ? 
opAddEntry.ledger.getId() : -1), timeoutSec);
+                        opAddEntry.handleAddFailure(opAddEntry.ledger);
+                    }
+                }
+            }, config.getAddEntryTimeoutSeconds(), 
config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS);
+        }
     }
 
     private synchronized void initializeBookKeeper(final 
ManagedLedgerInitializeLedgerCallback callback) {
@@ -1146,6 +1172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
             closeAllCursors(callback, ctx);
         }, null);
+        
+        if (this.timeoutTask != null) {
+            this.timeoutTask.cancel(false);
+        }
     }
 
     private void closeAllCursors(CloseCallback callback, final Object ctx) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 5d9d57c..0cef300 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -23,7 +23,10 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -35,6 +38,8 @@ import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
 /**
  * Handles the life-cycle of an addEntry() operation.
@@ -42,7 +47,7 @@ import org.slf4j.LoggerFactory;
  */
 class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
     private ManagedLedgerImpl ml;
-    private LedgerHandle ledger;
+    LedgerHandle ledger;
     private long entryId;
 
     @SuppressWarnings("unused")
@@ -50,6 +55,11 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
     private Object ctx;
     private boolean closeWhenDone;
     private long startTime;
+    volatile long lastInitTime;
+    private static final AtomicIntegerFieldUpdater<OpAddEntry> 
COMPLETED_UPDATER =
+        AtomicIntegerFieldUpdater.newUpdater(OpAddEntry.class, "completed");
+    @SuppressWarnings("unused")
+    volatile int completed = FALSE;
     ByteBuf data;
     private int dataLength;
 
@@ -67,6 +77,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, 
CloseCallback {
         op.closeWhenDone = false;
         op.entryId = -1;
         op.startTime = System.nanoTime();
+        op.completed = FALSE;
         ml.mbean.addAddEntrySample(op.dataLength);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -86,6 +97,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, 
CloseCallback {
         ByteBuf duplicateBuffer = data.retainedDuplicate();
         // duplicatedBuffer has refCnt=1 at this point
 
+        lastInitTime = System.nanoTime();
         ledger.asyncAddEntry(duplicateBuffer, this, ctx);
 
         // Internally, asyncAddEntry() is refCnt neutral to respect to the 
passed buffer and it will keep a ref on it
@@ -95,6 +107,9 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
     }
 
     public void failed(ManagedLedgerException e) {
+        if (!checkAndCompleteTimeoutTask()) {
+            return;
+        }
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
         data.release();
         if (cb != null) {
@@ -119,17 +134,11 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
         }
 
         if (rc != BKException.Code.OK) {
-            // If we get a write error, we will try to create a new ledger and 
re-submit the pending writes. If the
-            // ledger creation fails (persistent bk failure, another instanche 
owning the ML, ...), then the writes will
-            // be marked as failed.
-            ml.mbean.recordAddEntryError();
-
-            ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() 
-> {
-                // Force the creation of a new ledger. Doing it in a 
background thread to avoid acquiring ML lock
-                // from a BK callback.
-                ml.ledgerClosed(lh);
-            }));
+            handleAddFailure(lh);
         } else {
+            if(!checkAndCompleteTimeoutTask()) {
+                return;
+            }
             // Trigger addComplete callback in a thread hashed on the managed 
ledger name
             ml.getExecutor().executeOrdered(ml.getName(), this);
         }
@@ -200,6 +209,40 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
         ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
     }
 
+    /**
+     * It cancels timeout task and checks if add-entry operation is not 
completed yet.
+     * 
+     * @return true if task is not already completed else returns false.
+     */
+    private boolean checkAndCompleteTimeoutTask() {
+        if (!COMPLETED_UPDATER.compareAndSet(this, FALSE, TRUE)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Add-entry already completed for {}-{}", this.ledger 
!= null ? this.ledger.getId() : -1,
+                        this.entryId);
+            }
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * It handles add failure on the given ledger. it can be triggered when 
add-entry fails or times out.
+     * 
+     * @param ledger
+     */
+    void handleAddFailure(final LedgerHandle ledger) {
+        // If we get a write error, we will try to create a new ledger and 
re-submit the pending writes. If the
+        // ledger creation fails (persistent bk failure, another instanche 
owning the ML, ...), then the writes will
+        // be marked as failed.
+        ml.mbean.recordAddEntryError();
+
+        ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> {
+            // Force the creation of a new ledger. Doing it in a background 
thread to avoid acquiring ML lock
+            // from a BK callback.
+            ml.ledgerClosed(ledger);
+        }));
+    }
+    
     private final Handle<OpAddEntry> recyclerHandle;
 
     private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
@@ -220,8 +263,10 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
         callback = null;
         ctx = null;
         closeWhenDone = false;
+        completed = FALSE;
         entryId = -1;
         startTime = -1;
+        lastInitTime = -1;
         recyclerHandle.recycle(this);
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index cedd3b0..2397a62 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -59,7 +59,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     long lastEntry = -1;
     boolean fenced = false;
 
-    PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+    public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws 
GeneralSecurityException {
         super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, 
"".getBytes()), DigestType.MAC, "".getBytes(),
                 EnumSet.noneOf(WriteFlag.class));
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a16dcd2..6ed13a5 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -34,8 +35,10 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.charset.Charset;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -46,10 +49,12 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -57,6 +62,13 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -2324,4 +2336,74 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
         ledger.close();
     }
+
+    /**
+     * It verifies that if bk-client doesn't complete the add-entry in given 
time out then broker is resilient enought
+     * to create new ledger and add entry successfully.
+     * 
+     * 
+     * @throws Exception
+     */
+    @Test(timeOut = 20000)
+    public void testManagedLedgerWithAddEntryTimeOut() throws Exception {
+        ManagedLedgerConfig config = new 
ManagedLedgerConfig().setAddEntryTimeoutSeconds(1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("timeout_ledger_test", config);
+
+        BookKeeper bk = mock(BookKeeper.class);
+        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), 
any(), any(), any(), any(), any());
+
+        PulsarMockBookKeeper bkClient = mock(PulsarMockBookKeeper.class);
+        ClientConfiguration conf = new ClientConfiguration();
+        doReturn(conf).when(bkClient).getConf();
+        class MockLedgerHandle extends PulsarMockLedgerHandle {
+            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, 
DigestType digest, byte[] passwd)
+                    throws GeneralSecurityException {
+                super(bk, id, digest, passwd);
+            }
+
+            @Override
+            public void asyncAddEntry(final byte[] data, final AddCallback cb, 
final Object ctx) {
+                // do nothing
+            }
+
+            @Override
+            public void 
asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object 
ctx) {
+                cb.closeComplete(BKException.Code.OK, this, ctx);
+            }
+        }
+        MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
+        final String data = "data";
+        doNothing().when(ledgerHandle).asyncAddEntry(data.getBytes(), null, 
null);
+        AtomicBoolean addSuccess = new AtomicBoolean();
+
+        setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", 
ledgerHandle);
+
+        final int totalAddEntries = 1;
+        CountDownLatch latch = new CountDownLatch(totalAddEntries);
+        ledger.asyncAddEntry(data.getBytes(), new AddEntryCallback() {
+
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                addSuccess.set(true);
+                latch.countDown();
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                latch.countDown();
+            }
+        }, null);
+
+        latch.await();
+
+        assertTrue(addSuccess.get());
+
+        setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
+    }
+
+    private void setFieldValue(Class clazz, Object classObj, String fieldName, 
Object fieldValue) throws Exception {
+        Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(classObj, fieldValue);
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9625987..048aecd 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -757,14 +757,17 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "operation timeout while updating managed-ledger metadata."
     )
     private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
+
     @FieldContext(
             category = CATEGORY_STORAGE_ML,
             doc = "Read entries timeout when broker tries to read messages 
from bookkeeper "
-                    + "(disable timeout by setting readTimeoutSeconds <= 0)"
+                    + "(0 to disable it)"
         )
-    private long managedLedgerReadEntryTimeoutSeconds = 60;
+    private long managedLedgerReadEntryTimeoutSeconds = 120;
         
-    
+    @FieldContext(category = CATEGORY_STORAGE_ML, 
+            doc = "Add entry timeout when broker tries to publish message to 
bookkeeper.(0 to disable it)")
+    private long managedLedgerAddEntryTimeoutSeconds = 120;
 
     /*** --- Load balancer --- ****/
     @FieldContext(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b2b5103..ff2a8dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -745,6 +745,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
                     
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
             
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+            
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
             
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
             
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
             
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index e0130d2..0720d3e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
@@ -290,5 +291,11 @@ public abstract class MockedPulsarServiceBaseTest {
         }
     }
 
+    public static void setFieldValue(Class clazz, Object classObj, String 
fieldName, Object fieldValue) throws Exception {
+        Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(classObj, fieldValue);
+    }
+    
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6988863..feb5b9e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -23,9 +23,12 @@ import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -33,6 +36,7 @@ import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -46,16 +50,24 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -757,4 +769,66 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void testAddEntryOperationTimeout() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setManagedLedgerAddEntryTimeoutSeconds(1);
+
+        final String topicName = 
"persistent://my-property/my-ns/addEntryTimeoutTopic";
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+
+        class MockLedgerHandle extends PulsarMockLedgerHandle {
+            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, 
DigestType digest, byte[] passwd)
+                    throws GeneralSecurityException {
+                super(bk, id, digest, passwd);
+            }
+
+            @Override
+            public void asyncAddEntry(final byte[] data, final AddCallback cb, 
final Object ctx) {
+                // do nothing
+            }
+
+            @Override
+            public void 
asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object 
ctx) {
+                cb.closeComplete(BKException.Code.OK, this, ctx);
+            }
+        }
+        MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class);
+        final byte[] data = "data".getBytes();
+        // this will make first entry to be timed out but then managed-ledger 
will create a new ledger and next time add
+        // entry should be successful.
+        doNothing().when(ledgerHandle).asyncAddEntry(data, null, null);
+
+        MockedPulsarServiceBaseTest.setFieldValue(ManagedLedgerImpl.class, ml, 
"currentLedger", ledgerHandle);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean addedSuccessfully = new AtomicBoolean(false);
+        producer.sendAsync(data).handle((res, ex) -> {
+            if (ex == null) {
+                addedSuccessfully.set(true);
+            } else {
+                log.error("add-entry failed for {}", methodName, ex);
+            }
+            latch.countDown();
+            return null;
+        });
+        latch.await();
+
+        // broker should be resilient enough to add-entry timeout and add 
entry successfully.
+        assertTrue(addedSuccessfully.get());
+
+        byte[] receivedData = consumer.receive().getData();
+        assertEquals(receivedData, data);
+
+        producer.close();
+        consumer.close();
+    }
+    
 }
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index bbb3383..616873b 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -245,6 +245,9 @@ configs:
 - name: managedLedgerReadEntryTimeoutSeconds
   default: '120'
   description: Read entries timeout when broker tries to read messages from 
bookkeeper.
+- name: managedLedgerAddEntryTimeoutSeconds
+  default: '120'
+  description: Add entry timeout when broker tries to publish message to 
bookkeeper (0 to disable it).
 - name: loadBalancerEnabled
   default: 'true'
   description: Enable load balancer

Reply via email to