This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e88c1  Add ledger op timeout to avoid topics stuck on 
ledger-creation (#2535)
d5e88c1 is described below

commit d5e88c1ec16df557655e42c9f648a2fd3343d759
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Sun Sep 16 21:55:21 2018 -0700

    Add ledger op timeout to avoid topics stuck on ledger-creation (#2535)
    
    * Add ledger op timeout to avoid topics stuck on ledger-creation
    
    * rename to metadataOperationsTimeoutSeconds
    
    * ad service config for managedLedgerMetadataOperationsTimeoutSeconds
---
 conf/broker.conf                                   |   3 +
 conf/standalone.conf                               |   3 +
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  21 +++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 101 +++++++-------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 150 ++++++++++++++-------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  47 ++++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  10 ++
 .../pulsar/broker/service/BrokerService.java       |   2 +
 8 files changed, 234 insertions(+), 103 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8927a85..2277cc8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -378,6 +378,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
 # corrupted at bookkeeper and managed-cursor is stuck at that ledger.
 autoSkipNonRecoverableData=false
 
+# operation timeout while updating managed-ledger metadata.
+managedLedgerMetadataOperationsTimeoutSeconds=60
+
 ### --- Load balancer --- ###
 
 # Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a68664c..cc8f564 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -318,6 +318,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
 # corrupted at bookkeeper and managed-cursor is stuck at that ledger.
 autoSkipNonRecoverableData=false
 
+# operation timeout while updating managed-ledger metadata.
+managedLedgerMetadataOperationsTimeoutSeconds=60
+
 ### --- Load balancer --- ###
 
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 698d245..5967453 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -56,6 +56,7 @@ public class ManagedLedgerConfig {
     private boolean autoSkipNonRecoverableData;
     private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
     private long offloadAutoTriggerSizeThresholdBytes = -1;
+    private long metadataOperationsTimeoutSeconds = 60;
 
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -511,4 +512,24 @@ public class ManagedLedgerConfig {
         this.clock = clock;
         return this;
     }
+
+    /**
+     * 
+     * Ledger-Op (Create/Delete) timeout
+     * 
+     * @return
+     */
+    public long getMetadataOperationsTimeoutSeconds() {
+        return metadataOperationsTimeoutSeconds;
+    }
+
+    /**
+     * Ledger-Op (Create/Delete) timeout after which callback will be 
completed with failure
+     * 
+     * @param metadataOperationsTimeoutSeconds
+     */
+    public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long 
metadataOperationsTimeoutSeconds) {
+        this.metadataOperationsTimeoutSeconds = 
metadataOperationsTimeoutSeconds;
+        return this;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 50b4722..0ac818f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1974,68 +1974,73 @@ public class ManagedCursorImpl implements ManagedCursor 
{
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
 
-        bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), 
config.getMetadataWriteQuorumSize(),
-                config.getMetadataAckQuorumSize(), digestType, 
config.getPassword(), (rc, lh, ctx) -> {
-                    ledger.getExecutor().execute(safeRun(() -> {
-                        ledger.mbean.endCursorLedgerCreateOp();
-                        if (rc != BKException.Code.OK) {
-                            log.warn("[{}] Error creating ledger for cursor 
{}: {}", ledger.getName(), name,
-                                    BKException.getMessage(rc));
-                            callback.operationFailed(new 
ManagedLedgerException(BKException.getMessage(rc)));
-                            return;
-                        }
+        ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) 
-> {
+
+            if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                return;
+            }
 
+            ledger.getExecutor().execute(safeRun(() -> {
+                ledger.mbean.endCursorLedgerCreateOp();
+                if (rc != BKException.Code.OK) {
+                    log.warn("[{}] Error creating ledger for cursor {}: {}", 
ledger.getName(), name,
+                            BKException.getMessage(rc));
+                    callback.operationFailed(new 
ManagedLedgerException(BKException.getMessage(rc)));
+                    return;
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Created ledger {} for cursor {}", 
ledger.getName(), lh.getId(), name);
+                }
+                // Created the ledger, now write the last position
+                // content
+                MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+                persistPositionToLedger(lh, mdEntry, new VoidCallback() {
+                    @Override
+                    public void operationComplete() {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Created ledger {} for cursor {}", 
ledger.getName(), lh.getId(), name);
+                            log.debug("[{}] Persisted position {} for cursor 
{}", ledger.getName(),
+                                    mdEntry.newPosition, name);
                         }
-                        // Created the ledger, now write the last position
-                        // content
-                        MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
-                        persistPositionToLedger(lh, mdEntry, new 
VoidCallback() {
+                        switchToNewLedger(lh, new VoidCallback() {
                             @Override
                             public void operationComplete() {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Persisted position {} for 
cursor {}", ledger.getName(),
-                                            mdEntry.newPosition, name);
-                                }
-                                switchToNewLedger(lh, new VoidCallback() {
-                                    @Override
-                                    public void operationComplete() {
-                                        callback.operationComplete();
-                                    }
-
-                                    @Override
-                                    public void 
operationFailed(ManagedLedgerException exception) {
-                                        // it means it failed to switch the 
newly created ledger so, it should be
-                                        // deleted to prevent leak
-                                        
bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
-                                            if (rc != BKException.Code.OK) {
-                                                log.warn("[{}] Failed to 
delete orphan ledger {}", ledger.getName(),
-                                                        lh.getId());
-                                            }
-                                        }, null);
-                                        callback.operationFailed(exception);
-                                    }
-                                });
+                                callback.operationComplete();
                             }
 
                             @Override
                             public void operationFailed(ManagedLedgerException 
exception) {
-                                log.warn("[{}] Failed to persist position {} 
for cursor {}", ledger.getName(),
-                                        mdEntry.newPosition, name);
-
-                                ledger.mbean.startCursorLedgerDeleteOp();
-                                bookkeeper.asyncDeleteLedger(lh.getId(), new 
DeleteCallback() {
-                                    @Override
-                                    public void deleteComplete(int rc, Object 
ctx) {
-                                        ledger.mbean.endCursorLedgerDeleteOp();
+                                // it means it failed to switch the newly 
created ledger so, it should be
+                                // deleted to prevent leak
+                                bookkeeper.asyncDeleteLedger(lh.getId(), (int 
rc, Object ctx) -> {
+                                    if (rc != BKException.Code.OK) {
+                                        log.warn("[{}] Failed to delete orphan 
ledger {}", ledger.getName(),
+                                                lh.getId());
                                     }
                                 }, null);
                                 callback.operationFailed(exception);
                             }
                         });
-                    }));
-                }, null, Collections.emptyMap());
+                    }
+
+                    @Override
+                    public void operationFailed(ManagedLedgerException 
exception) {
+                        log.warn("[{}] Failed to persist position {} for 
cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+
+                        ledger.mbean.startCursorLedgerDeleteOp();
+                        bookkeeper.asyncDeleteLedger(lh.getId(), new 
DeleteCallback() {
+                            @Override
+                            public void deleteComplete(int rc, Object ctx) {
+                                ledger.mbean.endCursorLedgerDeleteOp();
+                            }
+                        }, null);
+                        callback.operationFailed(exception);
+                    }
+                });
+            }));
+        }, Collections.emptyMap());
+       
     }
 
     private List<LongProperty> buildPropertiesMap(Map<String, Long> 
properties) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f332e17..d33f151 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,17 +22,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 import java.time.Clock;
 import java.util.Collections;
 import java.util.Iterator;
@@ -51,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +52,7 @@ import 
org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.Backoff;
@@ -111,6 +102,17 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
 
@@ -185,7 +187,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
-
+    
     enum State {
         None, // Uninitialized
         LedgerOpened, // A ledger is ready to write into
@@ -364,39 +366,44 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         // Create a new ledger to start writing
         this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
         mbean.startDataLedgerCreateOp();
-        bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
-                digestType, config.getPassword(), (rc, lh, ctx) -> {
-                    executor.executeOrdered(name, safeRun(() -> {
-                        mbean.endDataLedgerCreateOp();
-                        if (rc != BKException.Code.OK) {
-                            
callback.initializeFailed(createManagedLedgerException(rc));
-                            return;
-                        }
+        
+        asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
 
-                        log.info("[{}] Created ledger {}", name, lh.getId());
-                        STATE_UPDATER.set(this, State.LedgerOpened);
-                        lastLedgerCreatedTimestamp = clock.millis();
-                        currentLedger = lh;
-
-                        lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
-                        // bypass empty ledgers, find last ledger with Message 
if possible.
-                        while (lastConfirmedEntry.getEntryId() == -1) {
-                            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
-                            if (formerLedger != null) {
-                                LedgerInfo ledgerInfo = 
formerLedger.getValue();
-                                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
-                            } else {
-                                break;
-                            }
-                        }
+            if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                return;
+            }
+
+            executor.executeOrdered(name, safeRun(() -> {
+                mbean.endDataLedgerCreateOp();
+                if (rc != BKException.Code.OK) {
+                    
callback.initializeFailed(createManagedLedgerException(rc));
+                    return;
+                }
 
-                        LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
-                        ledgers.put(lh.getId(), info);
+                log.info("[{}] Created ledger {}", name, lh.getId());
+                STATE_UPDATER.set(this, State.LedgerOpened);
+                lastLedgerCreatedTimestamp = clock.millis();
+                currentLedger = lh;
+
+                lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+                // bypass empty ledgers, find last ledger with Message if 
possible.
+                while (lastConfirmedEntry.getEntryId() == -1) {
+                    Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+                    if (formerLedger != null) {
+                        LedgerInfo ledgerInfo = formerLedger.getValue();
+                        lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+                    } else {
+                        break;
+                    }
+                }
+
+                LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
+                ledgers.put(lh.getId(), info);
 
-                        // Save it back to ensure all nodes exist
-                        store.asyncUpdateLedgerIds(name, 
getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
-                    }));
-                }, null, Collections.emptyMap());
+                // Save it back to ensure all nodes exist
+                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat, storeLedgersCb);
+            }));
+        }, Collections.emptyMap());
     }
 
     private void initializeCursors(final ManagedLedgerInitializeLedgerCallback 
callback) {
@@ -564,9 +571,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, 
State.CreatingLedger)) {
                 this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
                 mbean.startDataLedgerCreateOp();
-                bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                        config.getAckQuorumSize(), digestType, 
config.getPassword(), this, null,
-                        Collections.emptyMap());
+                asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
             }
         } else {
             checkArgument(state == State.LedgerOpened, "ledger=%s is not 
opened", state);
@@ -1155,6 +1160,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != 
null ? lh.getId() : -1);
         }
+        
+        if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+            return;
+        }
+        
         mbean.endDataLedgerCreateOp();
         if (rc != BKException.Code.OK) {
             log.error("[{}] Error creating ledger rc={} {}", name, rc, 
BKException.getMessage(rc));
@@ -1320,9 +1330,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
             mbean.startDataLedgerCreateOp();
-            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, 
config.getPassword(), this, null,
-                    Collections.emptyMap());
+            asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
         }
     }
 
@@ -2796,6 +2804,52 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    /**
+     * Create ledger async and schedule a timeout task to check 
ledger-creation is complete else it fails the callback
+     * with TimeoutException.
+     * 
+     * @param bookKeeper
+     * @param config
+     * @param digestType
+     * @param cb
+     * @param emptyMap
+     */
+    protected void asyncCreateLedger(BookKeeper bookKeeper, 
ManagedLedgerConfig config, DigestType digestType,
+            CreateCallback cb, Map<Object, Object> emptyMap) {
+        AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+        bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
+                digestType, config.getPassword(), cb, ledgerCreated, 
Collections.emptyMap());
+        scheduledExecutor.schedule(() -> {
+            if (!ledgerCreated.get()) {
+                ledgerCreated.set(true);
+                cb.createComplete(BKException.Code.TimeoutException, null, 
null);
+            }
+        }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    /**
+     * check if ledger-op task is already completed by timeout-task. If 
completed then delete the created ledger
+     * 
+     * @param rc
+     * @param lh
+     * @param ctx
+     * @return
+     */
+    protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, 
Object ctx) {
+        if (ctx != null && ctx instanceof AtomicBoolean) {
+            // ledger-creation is already timed out and callback is already 
completed so, delete this ledger and return.
+            if (((AtomicBoolean) (ctx)).get()) {
+                if (rc == BKException.Code.OK) {
+                    log.warn("[{}]-{} ledger creation timed-out, deleting 
ledger", this.name, lh.getId());
+                    asyncDeleteLedger(lh.getId(), 
DEFAULT_LEDGER_DELETE_RETRIES);
+                }
+                return true;
+            }
+            ((AtomicBoolean) ctx).set(true);
+        }
+        return false;
+    }
+    
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d7cce97..aa85d80 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -25,18 +29,12 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -47,10 +45,15 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -87,6 +90,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTest.class);
@@ -2215,4 +2225,27 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
         
assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
     }
+    
+    @Test
+    public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
+        ManagedLedgerConfig config = new 
ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("timeout_ledger_test", config);
+        
+        BookKeeper bk = mock(BookKeeper.class);
+        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), 
any(), any(), any(), any(), any());
+        AtomicInteger response = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(1);
+        ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
+            @Override
+            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+                response.set(rc);
+                latch.countDown();
+            }
+        }, Collections.emptyMap());
+
+        latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, 
TimeUnit.SECONDS);
+        assertEquals(response.get(), BKException.Code.TimeoutException);
+        
+        ledger.close();
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 65696f7..84cc569 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -373,6 +373,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // corrupted at bookkeeper and managed-cursor is stuck at that ledger.
     @FieldContext(dynamic = true)
     private boolean autoSkipNonRecoverableData = false;
+    // operation timeout while updating managed-ledger metadata.
+    private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
 
     /*** --- Load balancer --- ****/
     // Enable load balancer
@@ -1314,6 +1316,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         this.autoSkipNonRecoverableData = skipNonRecoverableLedger;
     }
 
+    public long getManagedLedgerMetadataOperationsTimeoutSeconds() {
+        return managedLedgerMetadataOperationsTimeoutSeconds;
+    }
+
+    public void setManagedLedgerMetadataOperationsTimeoutSeconds(long 
managedLedgerMetadataOperationsTimeoutSeconds) {
+        this.managedLedgerMetadataOperationsTimeoutSeconds = 
managedLedgerMetadataOperationsTimeoutSeconds;
+    }
+
     public boolean isLoadBalancerEnabled() {
         return loadBalancerEnabled;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8620e82..406d3e5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -755,6 +755,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                     TimeUnit.MINUTES);
             managedLedgerConfig.setMaxSizePerLedgerMb(2048);
 
+            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                    
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
             
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
             
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
             
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());

Reply via email to