rdhabalia closed pull request #2535: Add ledger op timeout to avoid topics 
stuck on ledger-creation
URL: https://github.com/apache/incubator-pulsar/pull/2535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 8927a85f79..2277cc8167 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -378,6 +378,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
 # corrupted at bookkeeper and managed-cursor is stuck at that ledger.
 autoSkipNonRecoverableData=false
 
+# operation timeout while updating managed-ledger metadata.
+managedLedgerMetadataOperationsTimeoutSeconds=60
+
 ### --- Load balancer --- ###
 
 # Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a68664cd8d..cc8f56417f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -318,6 +318,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
 # corrupted at bookkeeper and managed-cursor is stuck at that ledger.
 autoSkipNonRecoverableData=false
 
+# operation timeout while updating managed-ledger metadata.
+managedLedgerMetadataOperationsTimeoutSeconds=60
+
 ### --- Load balancer --- ###
 
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 698d2459ae..5967453ca5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -56,6 +56,7 @@
     private boolean autoSkipNonRecoverableData;
     private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
     private long offloadAutoTriggerSizeThresholdBytes = -1;
+    private long metadataOperationsTimeoutSeconds = 60;
 
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -511,4 +512,24 @@ public ManagedLedgerConfig setClock(Clock clock) {
         this.clock = clock;
         return this;
     }
+
+    /**
+     * 
+     * Ledger-Op (Create/Delete) timeout
+     * 
+     * @return
+     */
+    public long getMetadataOperationsTimeoutSeconds() {
+        return metadataOperationsTimeoutSeconds;
+    }
+
+    /**
+     * Ledger-Op (Create/Delete) timeout after which callback will be 
completed with failure
+     * 
+     * @param metadataOperationsTimeoutSeconds
+     */
+    public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long 
metadataOperationsTimeoutSeconds) {
+        this.metadataOperationsTimeoutSeconds = 
metadataOperationsTimeoutSeconds;
+        return this;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 50b47229d6..0ac818fa34 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1974,68 +1974,73 @@ void internalFlushPendingMarkDeletes() {
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
 
-        bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), 
config.getMetadataWriteQuorumSize(),
-                config.getMetadataAckQuorumSize(), digestType, 
config.getPassword(), (rc, lh, ctx) -> {
-                    ledger.getExecutor().execute(safeRun(() -> {
-                        ledger.mbean.endCursorLedgerCreateOp();
-                        if (rc != BKException.Code.OK) {
-                            log.warn("[{}] Error creating ledger for cursor 
{}: {}", ledger.getName(), name,
-                                    BKException.getMessage(rc));
-                            callback.operationFailed(new 
ManagedLedgerException(BKException.getMessage(rc)));
-                            return;
-                        }
+        ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) 
-> {
+
+            if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                return;
+            }
 
+            ledger.getExecutor().execute(safeRun(() -> {
+                ledger.mbean.endCursorLedgerCreateOp();
+                if (rc != BKException.Code.OK) {
+                    log.warn("[{}] Error creating ledger for cursor {}: {}", 
ledger.getName(), name,
+                            BKException.getMessage(rc));
+                    callback.operationFailed(new 
ManagedLedgerException(BKException.getMessage(rc)));
+                    return;
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Created ledger {} for cursor {}", 
ledger.getName(), lh.getId(), name);
+                }
+                // Created the ledger, now write the last position
+                // content
+                MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+                persistPositionToLedger(lh, mdEntry, new VoidCallback() {
+                    @Override
+                    public void operationComplete() {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Created ledger {} for cursor {}", 
ledger.getName(), lh.getId(), name);
+                            log.debug("[{}] Persisted position {} for cursor 
{}", ledger.getName(),
+                                    mdEntry.newPosition, name);
                         }
-                        // Created the ledger, now write the last position
-                        // content
-                        MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
-                        persistPositionToLedger(lh, mdEntry, new 
VoidCallback() {
+                        switchToNewLedger(lh, new VoidCallback() {
                             @Override
                             public void operationComplete() {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Persisted position {} for 
cursor {}", ledger.getName(),
-                                            mdEntry.newPosition, name);
-                                }
-                                switchToNewLedger(lh, new VoidCallback() {
-                                    @Override
-                                    public void operationComplete() {
-                                        callback.operationComplete();
-                                    }
-
-                                    @Override
-                                    public void 
operationFailed(ManagedLedgerException exception) {
-                                        // it means it failed to switch the 
newly created ledger so, it should be
-                                        // deleted to prevent leak
-                                        
bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
-                                            if (rc != BKException.Code.OK) {
-                                                log.warn("[{}] Failed to 
delete orphan ledger {}", ledger.getName(),
-                                                        lh.getId());
-                                            }
-                                        }, null);
-                                        callback.operationFailed(exception);
-                                    }
-                                });
+                                callback.operationComplete();
                             }
 
                             @Override
                             public void operationFailed(ManagedLedgerException 
exception) {
-                                log.warn("[{}] Failed to persist position {} 
for cursor {}", ledger.getName(),
-                                        mdEntry.newPosition, name);
-
-                                ledger.mbean.startCursorLedgerDeleteOp();
-                                bookkeeper.asyncDeleteLedger(lh.getId(), new 
DeleteCallback() {
-                                    @Override
-                                    public void deleteComplete(int rc, Object 
ctx) {
-                                        ledger.mbean.endCursorLedgerDeleteOp();
+                                // it means it failed to switch the newly 
created ledger so, it should be
+                                // deleted to prevent leak
+                                bookkeeper.asyncDeleteLedger(lh.getId(), (int 
rc, Object ctx) -> {
+                                    if (rc != BKException.Code.OK) {
+                                        log.warn("[{}] Failed to delete orphan 
ledger {}", ledger.getName(),
+                                                lh.getId());
                                     }
                                 }, null);
                                 callback.operationFailed(exception);
                             }
                         });
-                    }));
-                }, null, Collections.emptyMap());
+                    }
+
+                    @Override
+                    public void operationFailed(ManagedLedgerException 
exception) {
+                        log.warn("[{}] Failed to persist position {} for 
cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+
+                        ledger.mbean.startCursorLedgerDeleteOp();
+                        bookkeeper.asyncDeleteLedger(lh.getId(), new 
DeleteCallback() {
+                            @Override
+                            public void deleteComplete(int rc, Object ctx) {
+                                ledger.mbean.endCursorLedgerDeleteOp();
+                            }
+                        }, null);
+                        callback.operationFailed(exception);
+                    }
+                });
+            }));
+        }, Collections.emptyMap());
+       
     }
 
     private List<LongProperty> buildPropertiesMap(Map<String, Long> 
properties) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6e4a6090c5..99039cf77b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,17 +22,6 @@
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 import java.time.Clock;
 import java.util.Collections;
 import java.util.Iterator;
@@ -51,6 +40,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +52,7 @@
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.Backoff;
@@ -111,6 +102,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
 
@@ -185,7 +187,7 @@
 
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
-
+    
     enum State {
         None, // Uninitialized
         LedgerOpened, // A ledger is ready to write into
@@ -364,39 +366,44 @@ public void operationFailed(MetaStoreException e) {
         // Create a new ledger to start writing
         this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
         mbean.startDataLedgerCreateOp();
-        bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
-                digestType, config.getPassword(), (rc, lh, ctx) -> {
-                    executor.executeOrdered(name, safeRun(() -> {
-                        mbean.endDataLedgerCreateOp();
-                        if (rc != BKException.Code.OK) {
-                            
callback.initializeFailed(createManagedLedgerException(rc));
-                            return;
-                        }
+        
+        asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
 
-                        log.info("[{}] Created ledger {}", name, lh.getId());
-                        STATE_UPDATER.set(this, State.LedgerOpened);
-                        lastLedgerCreatedTimestamp = clock.millis();
-                        currentLedger = lh;
-
-                        lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
-                        // bypass empty ledgers, find last ledger with Message 
if possible.
-                        while (lastConfirmedEntry.getEntryId() == -1) {
-                            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
-                            if (formerLedger != null) {
-                                LedgerInfo ledgerInfo = 
formerLedger.getValue();
-                                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
-                            } else {
-                                break;
-                            }
-                        }
+            if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                return;
+            }
+
+            executor.executeOrdered(name, safeRun(() -> {
+                mbean.endDataLedgerCreateOp();
+                if (rc != BKException.Code.OK) {
+                    
callback.initializeFailed(createManagedLedgerException(rc));
+                    return;
+                }
 
-                        LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
-                        ledgers.put(lh.getId(), info);
+                log.info("[{}] Created ledger {}", name, lh.getId());
+                STATE_UPDATER.set(this, State.LedgerOpened);
+                lastLedgerCreatedTimestamp = clock.millis();
+                currentLedger = lh;
+
+                lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+                // bypass empty ledgers, find last ledger with Message if 
possible.
+                while (lastConfirmedEntry.getEntryId() == -1) {
+                    Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+                    if (formerLedger != null) {
+                        LedgerInfo ledgerInfo = formerLedger.getValue();
+                        lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+                    } else {
+                        break;
+                    }
+                }
+
+                LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
+                ledgers.put(lh.getId(), info);
 
-                        // Save it back to ensure all nodes exist
-                        store.asyncUpdateLedgerIds(name, 
getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
-                    }));
-                }, null, Collections.emptyMap());
+                // Save it back to ensure all nodes exist
+                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat, storeLedgersCb);
+            }));
+        }, Collections.emptyMap());
     }
 
     private void initializeCursors(final ManagedLedgerInitializeLedgerCallback 
callback) {
@@ -564,9 +571,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry 
addOperation) {
             if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, 
State.CreatingLedger)) {
                 this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
                 mbean.startDataLedgerCreateOp();
-                bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                        config.getAckQuorumSize(), digestType, 
config.getPassword(), this, null,
-                        Collections.emptyMap());
+                asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
             }
         } else {
             checkArgument(state == State.LedgerOpened, "ledger=%s is not 
opened", state);
@@ -1155,6 +1160,11 @@ public synchronized void createComplete(int rc, final 
LedgerHandle lh, Object ct
         if (log.isDebugEnabled()) {
             log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != 
null ? lh.getId() : -1);
         }
+        
+        if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+            return;
+        }
+        
         mbean.endDataLedgerCreateOp();
         if (rc != BKException.Code.OK) {
             log.error("[{}] Error creating ledger rc={} {}", name, rc, 
BKException.getMessage(rc));
@@ -1319,9 +1329,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
             mbean.startDataLedgerCreateOp();
-            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, 
config.getPassword(), this, null,
-                    Collections.emptyMap());
+            asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
         }
     }
 
@@ -2795,6 +2803,52 @@ public static ManagedLedgerException 
createManagedLedgerException(Throwable t) {
         }
     }
 
+    /**
+     * Create ledger async and schedule a timeout task to check 
ledger-creation is complete else it fails the callback
+     * with TimeoutException.
+     * 
+     * @param bookKeeper
+     * @param config
+     * @param digestType
+     * @param cb
+     * @param emptyMap
+     */
+    protected void asyncCreateLedger(BookKeeper bookKeeper, 
ManagedLedgerConfig config, DigestType digestType,
+            CreateCallback cb, Map<Object, Object> emptyMap) {
+        AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+        bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
+                digestType, config.getPassword(), cb, ledgerCreated, 
Collections.emptyMap());
+        scheduledExecutor.schedule(() -> {
+            if (!ledgerCreated.get()) {
+                ledgerCreated.set(true);
+                cb.createComplete(BKException.Code.TimeoutException, null, 
null);
+            }
+        }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    /**
+     * check if ledger-op task is already completed by timeout-task. If 
completed then delete the created ledger
+     * 
+     * @param rc
+     * @param lh
+     * @param ctx
+     * @return
+     */
+    protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, 
Object ctx) {
+        if (ctx != null && ctx instanceof AtomicBoolean) {
+            // ledger-creation is already timed out and callback is already 
completed so, delete this ledger and return.
+            if (((AtomicBoolean) (ctx)).get()) {
+                if (rc == BKException.Code.OK) {
+                    log.warn("[{}]-{} ledger creation timed-out, deleting 
ledger", this.name, lh.getId());
+                    asyncDeleteLedger(lh.getId(), 
DEFAULT_LEDGER_DELETE_RETRIES);
+                }
+                return true;
+            }
+            ((AtomicBoolean) ctx).set(true);
+        }
+        return false;
+    }
+    
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d7cce973bd..aa85d8027b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -25,18 +29,12 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -47,10 +45,15 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -87,6 +90,13 @@
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerTest.class);
@@ -2215,4 +2225,27 @@ public void testManagedLedgerWithoutAutoCreate() throws 
Exception {
 
         
assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
     }
+    
+    @Test
+    public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
+        ManagedLedgerConfig config = new 
ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("timeout_ledger_test", config);
+        
+        BookKeeper bk = mock(BookKeeper.class);
+        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), 
any(), any(), any(), any(), any());
+        AtomicInteger response = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(1);
+        ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
+            @Override
+            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+                response.set(rc);
+                latch.countDown();
+            }
+        }, Collections.emptyMap());
+
+        latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, 
TimeUnit.SECONDS);
+        assertEquals(response.get(), BKException.Code.TimeoutException);
+        
+        ledger.close();
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 65696f7338..84cc569cb8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -373,6 +373,8 @@
     // corrupted at bookkeeper and managed-cursor is stuck at that ledger.
     @FieldContext(dynamic = true)
     private boolean autoSkipNonRecoverableData = false;
+    // operation timeout while updating managed-ledger metadata.
+    private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
 
     /*** --- Load balancer --- ****/
     // Enable load balancer
@@ -1314,6 +1316,14 @@ public void setAutoSkipNonRecoverableData(boolean 
skipNonRecoverableLedger) {
         this.autoSkipNonRecoverableData = skipNonRecoverableLedger;
     }
 
+    public long getManagedLedgerMetadataOperationsTimeoutSeconds() {
+        return managedLedgerMetadataOperationsTimeoutSeconds;
+    }
+
+    public void setManagedLedgerMetadataOperationsTimeoutSeconds(long 
managedLedgerMetadataOperationsTimeoutSeconds) {
+        this.managedLedgerMetadataOperationsTimeoutSeconds = 
managedLedgerMetadataOperationsTimeoutSeconds;
+    }
+
     public boolean isLoadBalancerEnabled() {
         return loadBalancerEnabled;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8620e82a87..406d3e55c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -755,6 +755,8 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                     TimeUnit.MINUTES);
             managedLedgerConfig.setMaxSizePerLedgerMb(2048);
 
+            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                    
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
             
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
             
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
             
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to