This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5b3889c114b1e95275f5e1998d21c527b0f24c29 Author: Sijie Guo <[email protected]> AuthorDate: Tue Jun 30 15:32:53 2020 -0700 Catch NPE and detect state doesn't move (#7401) (cherry picked from commit 86e2610bebb46f479ea221a69782cd8575a04b16) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 49 +++++++++++++++------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ea0614d..810b3f8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -404,7 +404,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }; // Create a new ledger to start writing - this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); + this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); mbean.startDataLedgerCreateOp(); asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> { @@ -596,13 +596,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] Queue addEntry request", name); } + if (State.CreatingLedger == state) { + long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp; + if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) { + log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" + + " and creation timeout task didn't kick in as well. Force to fail the create ledger operation ..."); + this.createComplete(Code.TimeoutException, null, null); + } + } } else if (state == State.ClosedLedger) { // No ledger and no pending operations. Create a new ledger - if (log.isDebugEnabled()) { - log.debug("[{}] Creating a new ledger", name); - } + log.info("[{}] Creating a new ledger", name); if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { - this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); + this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); mbean.startDataLedgerCreateOp(); asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); } @@ -1229,8 +1235,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { metadataMutex.unlock(); updateLedgersIdsComplete(stat); synchronized (ManagedLedgerImpl.this) { - mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp, - TimeUnit.NANOSECONDS); + mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp, + TimeUnit.MILLISECONDS); } } @@ -1380,11 +1386,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (!pendingAddEntries.isEmpty()) { // Need to create a new ledger to write pending entries - if (log.isDebugEnabled()) { - log.debug("[{}] Creating a new ledger", name); - } + log.info("[{}] Creating a new ledger", name); STATE_UPDATER.set(this, State.CreatingLedger); - this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); + this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); mbean.startDataLedgerCreateOp(); asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); } @@ -3172,15 +3176,28 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Map<String, byte[]> finalMetadata = new HashMap<>(); finalMetadata.putAll(ledgerMetadata); finalMetadata.putAll(metadata); - if (log.isDebugEnabled()) { - log.debug("creating ledger, metadata: "+finalMetadata); - } - bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), + log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds", + name, finalMetadata, config.getMetadataOperationsTimeoutSeconds()); + try { + bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata); + } catch (Throwable cause) { + log.error("[{}] Encountered unexpected error when creating ledger", + name, cause); + cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); + return; + } scheduledExecutor.schedule(() -> { if (!ledgerCreated.get()) { - cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated); + if (log.isDebugEnabled()) { + log.debug("[{}] Timeout creating ledger", name); + } + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger already created when timeout task is triggered", name); + } } + cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated); }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS); }
