This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5310f74 Fixed managed ledger missing callback issue when unloading a topic (#1171) 5310f74 is described below commit 5310f74887f62f271213eb7f5191682b05a209c3 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Feb 5 08:40:45 2018 -0800 Fixed managed ledger missing callback issue when unloading a topic (#1171) --- .../bookkeeper/mledger/ManagedLedgerException.java | 8 +++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++-- .../mledger/impl/ManagedLedgerBkTest.java | 43 ++++++++++++++++++++++ .../broker/service/BrokerServiceException.java | 6 +++ .../org/apache/pulsar/broker/service/Producer.java | 9 ++++- .../broker/service/persistent/PersistentTopic.java | 15 +++++++- 6 files changed, 83 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index f5c4243..9ad8284 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -37,7 +37,7 @@ public class ManagedLedgerException extends Exception { } return new ManagedLedgerException(e); } - + public static class MetaStoreException extends ManagedLedgerException { public MetaStoreException(Exception e) { super(e); @@ -84,6 +84,12 @@ public class ManagedLedgerException extends Exception { } } + public static class ManagedLedgerAlreadyClosedException extends ManagedLedgerException { + public ManagedLedgerAlreadyClosedException(String msg) { + super(msg); + } + } + public static class CursorAlreadyClosedException extends ManagedLedgerException { public CursorAlreadyClosedException(String msg) { super(msg); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9bbea72..68a7645 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -20,8 +20,7 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import java.util.Iterator; @@ -59,6 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; @@ -92,7 +92,6 @@ import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final static long MegaByte = 1024 * 1024; @@ -1203,6 +1202,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final State state = STATE_UPDATER.get(this); if (state == State.ClosingLedger || state == State.LedgerOpened) { STATE_UPDATER.set(this, State.ClosedLedger); + } else if (state == State.Closed) { + // The managed ledger was closed during the write operation + clearPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); + return; } else { // In case we get multiple write errors for different outstanding write request, we should close the ledger // just once diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 5c27ffb..a945759 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -30,10 +30,13 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; +import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -461,4 +464,44 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { factory2.shutdown(); factory.shutdown(); } + + @Test(timeOut = 30000) + public void managedLedgerClosed() throws Exception { + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2); + ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger", config); + + int N = 100; + + AtomicReference<ManagedLedgerException> res = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(N); + + for (int i = 0; i < N; i++) { + ledger1.asyncAddEntry(("entry-" + i).getBytes(), new AddEntryCallback() { + + @Override + public void addComplete(Position position, Object ctx) { + latch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + res.compareAndSet(null, exception); + latch.countDown(); + } + }, null); + + if (i == 1) { + ledger1.close(); + } + } + + // Ensures all the callback must have been invoked + latch.await(); + assertNotNull(res.get()); + assertEquals(res.get().getClass(), ManagedLedgerAlreadyClosedException.class); + factory.shutdown(); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 2a6a195..1127237 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -47,6 +47,12 @@ public class BrokerServiceException extends Exception { } } + public static class TopicClosedException extends BrokerServiceException { + public TopicClosedException(Throwable t) { + super(t); + } + } + public static class PersistenceException extends BrokerServiceException { public PersistenceException(Throwable t) { super(t); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index e97f489..72b12d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.bookkeeper.mledger.util.Rate; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; @@ -279,8 +280,12 @@ public class Producer { ? ServerError.TopicTerminatedError : ServerError.PersistenceError; producer.cnx.ctx().channel().eventLoop().execute(() -> { - producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId, serverError, - exception.getMessage())); + if (!(exception instanceof TopicClosedException)) { + // For TopicClosed exception there's no need to send explicit error, since the client was + // already notified + producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId, + serverError, exception.getMessage())); + } producer.cnx.completedSendOperation(producer.isNonPersistentTopic); producer.publishOperationCompleted(); recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d38fa3b..0bc36ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; import org.apache.bookkeeper.mledger.Position; @@ -59,6 +60,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; @@ -253,7 +255,18 @@ public class PersistentTopic implements Topic, AddEntryCallback { @Override public void addFailed(ManagedLedgerException exception, Object ctx) { PublishContext callback = (PublishContext) ctx; - log.error("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); + + if (exception instanceof ManagedLedgerAlreadyClosedException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); + } + + callback.completed(new TopicClosedException(exception), -1, -1); + return; + + } else { + log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); + } if (exception instanceof ManagedLedgerTerminatedException) { // Signal the producer that this topic is no longer available -- To stop receiving notification emails like this one, please contact mme...@apache.org.