This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a704afd Support Add-entry timeout at broker to avoid stuck topics (#3347) a704afd is described below commit a704afd1946c5648f06b12031320359d5aef623d Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Jan 15 07:16:41 2019 -0800 Support Add-entry timeout at broker to avoid stuck topics (#3347) ### Motivation Recently and in past, we have seen few instances where bookie crashes when it goes out of memory and kernel panics and broker doesn't complete add-entry callback because of that topics get stuck on pending-writes. ``` 2010-01-01T00:18:19+0000 2010-01-01 00:18:19,594 21907 report_stuck:556 WARNING persistent://prop1/global/prod-rt-mxt/extractions-partition-0 is stuck (broker1.us-west1.com): waiting for write 2010-01-01T00:18:22+0000 2010-01-01 00:18:22,654 21907 report_stuck:556 WARNING persistent://prop1/global/mbr-events/27_bf1 is stuck (broker1.us-west1.com): waiting for write 2010-01-01T00:19:10+0000 2010-01-01 00:19:10,790 21907 report_stuck:556 WARNING persistent://prop1/global/mbr-events/30_bf1 is stuck (broker1.us-west1.com): waiting for write 2010-01-01T00:19:14+0000 2010-01-01 00:19:14,692 21907 report_stuck:556 WARNING persistent://prop1/global/mbr-events/35_gq2 is stuck (broker1.us-west1.com): waiting for write 2010-01-01T00:19:15+0000 2010-01-01 00:19:15,809 21907 report_stuck:556 WARNING persistent://prop1/global/jedi-events/batchevents-partition-35 is stuck (broker1.us-west1.com): waiting for write 2010-01-01T00:19:25+0000 2010-01-01 00:19:25,127 21907 report_stuck:556 WARNING persistent://prop1/global/jedi-events/mailevents-partition-24 is stuck (broker1.us-west1.com): waiting for write ``` internal-stats ``` { "entriesAddedCounter" : 66066317, "numberOfEntries" : 14129359, "totalSize" : 145505085493, "currentLedgerEntries" : 238938, "currentLedgerSize" : 3715327400, "lastLedgerCreatedTimestamp" : "2018-11-27 00:03:03.125+0000", "waitingCursorsCount" : 0, "pendingAddEntriesCount" : 252514, "lastConfirmedEntry" : "489781963:26121", "state" : "ClosingLedger", "ledgers" : [ { "ledgerId" : 489765238, "entries" : 559855, "size" : 6928710817 }, { ... ``` ### Modifications Add support of write timeout which can be disable by configuring timeout=0. ### Result topic will not stuck in pending write in case add-entry callback doesn't completes. --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../bookkeeper/mledger/ManagedLedgerConfig.java | 16 ++++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 ++++++++ .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 67 +++++++++++++++--- .../bookkeeper/client/PulsarMockLedgerHandle.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 82 ++++++++++++++++++++++ .../apache/pulsar/broker/ServiceConfiguration.java | 9 ++- .../pulsar/broker/service/BrokerService.java | 1 + .../broker/auth/MockedPulsarServiceBaseTest.java | 7 ++ .../client/impl/BrokerClientIntegrationTest.java | 76 +++++++++++++++++++- site/_data/config/broker.yaml | 3 + 12 files changed, 282 insertions(+), 17 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1cf3a8a..3c2e5ac 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -445,6 +445,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60 # Read entries timeout when broker tries to read messages from bookkeeper. managedLedgerReadEntryTimeoutSeconds=120 +# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). +managedLedgerAddEntryTimeoutSeconds=120 + ### --- Load balancer --- ### # Enable load balancer diff --git a/conf/standalone.conf b/conf/standalone.conf index 16d6465..06be447 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -332,6 +332,9 @@ managedLedgerMetadataOperationsTimeoutSeconds=60 # Read entries timeout when broker tries to read messages from bookkeeper. managedLedgerReadEntryTimeoutSeconds=120 +# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). +managedLedgerAddEntryTimeoutSeconds=120 + ### --- Load balancer --- ### loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 255b534..8f89050 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -58,7 +58,7 @@ public class ManagedLedgerConfig { private long offloadAutoTriggerSizeThresholdBytes = -1; private long metadataOperationsTimeoutSeconds = 60; private long readEntryTimeoutSeconds = 120; - + private long addEntryTimeoutSeconds = 120; private DigestType digestType = DigestType.CRC32C; private byte[] password = "".getBytes(Charsets.UTF_8); private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE; @@ -554,4 +554,18 @@ public class ManagedLedgerConfig { this.readEntryTimeoutSeconds = readEntryTimeoutSeconds; return this; } + + public long getAddEntryTimeoutSeconds() { + return addEntryTimeoutSeconds; + } + + /** + * Add-entry timeout after which add-entry callback will be failed if add-entry is not succeeded. + * + * @param addEntryTimeoutSeconds + */ + public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds) { + this.addEntryTimeoutSeconds = addEntryTimeoutSeconds; + return this; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index dd54630..26b3acc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; +import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import java.time.Clock; @@ -105,6 +106,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BoundType; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -166,6 +168,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors; final EntryCache entryCache; + + private ScheduledFuture<?> timeoutTask; /** * This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store @@ -328,6 +332,28 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } }); + + scheduleTimeoutTask(); + } + + private void scheduleTimeoutTask() { + long timeoutSec = config.getAddEntryTimeoutSeconds(); + // disable timeout task checker if timeout <= 0 + if (timeoutSec > 0) { + this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() -> { + OpAddEntry opAddEntry = pendingAddEntries.peek(); + if (opAddEntry != null) { + boolean isTimedOut = opAddEntry.lastInitTime != -1 + && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec + && opAddEntry.completed == FALSE; + if (isTimedOut) { + log.error("Failed to add entry for ledger {} in time-out {} sec", + (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec); + opAddEntry.handleAddFailure(opAddEntry.ledger); + } + } + }, config.getAddEntryTimeoutSeconds(), config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS); + } } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -1146,6 +1172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { closeAllCursors(callback, ctx); }, null); + + if (this.timeoutTask != null) { + this.timeoutTask.cancel(false); + } } private void closeAllCursors(CloseCallback callback, final Object ctx) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 5d9d57c..0cef300 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -23,7 +23,10 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; + +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; @@ -35,6 +38,8 @@ import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.bookkeeper.util.SafeRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE; +import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE; /** * Handles the life-cycle of an addEntry() operation. @@ -42,7 +47,7 @@ import org.slf4j.LoggerFactory; */ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { private ManagedLedgerImpl ml; - private LedgerHandle ledger; + LedgerHandle ledger; private long entryId; @SuppressWarnings("unused") @@ -50,6 +55,11 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { private Object ctx; private boolean closeWhenDone; private long startTime; + volatile long lastInitTime; + private static final AtomicIntegerFieldUpdater<OpAddEntry> COMPLETED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(OpAddEntry.class, "completed"); + @SuppressWarnings("unused") + volatile int completed = FALSE; ByteBuf data; private int dataLength; @@ -67,6 +77,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { op.closeWhenDone = false; op.entryId = -1; op.startTime = System.nanoTime(); + op.completed = FALSE; ml.mbean.addAddEntrySample(op.dataLength); if (log.isDebugEnabled()) { log.debug("Created new OpAddEntry {}", op); @@ -86,6 +97,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { ByteBuf duplicateBuffer = data.retainedDuplicate(); // duplicatedBuffer has refCnt=1 at this point + lastInitTime = System.nanoTime(); ledger.asyncAddEntry(duplicateBuffer, this, ctx); // Internally, asyncAddEntry() is refCnt neutral to respect to the passed buffer and it will keep a ref on it @@ -95,6 +107,9 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { } public void failed(ManagedLedgerException e) { + if (!checkAndCompleteTimeoutTask()) { + return; + } AddEntryCallback cb = callbackUpdater.getAndSet(this, null); data.release(); if (cb != null) { @@ -119,17 +134,11 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { } if (rc != BKException.Code.OK) { - // If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the - // ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will - // be marked as failed. - ml.mbean.recordAddEntryError(); - - ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> { - // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock - // from a BK callback. - ml.ledgerClosed(lh); - })); + handleAddFailure(lh); } else { + if(!checkAndCompleteTimeoutTask()) { + return; + } // Trigger addComplete callback in a thread hashed on the managed ledger name ml.getExecutor().executeOrdered(ml.getName(), this); } @@ -200,6 +209,40 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } + /** + * It cancels timeout task and checks if add-entry operation is not completed yet. + * + * @return true if task is not already completed else returns false. + */ + private boolean checkAndCompleteTimeoutTask() { + if (!COMPLETED_UPDATER.compareAndSet(this, FALSE, TRUE)) { + if (log.isDebugEnabled()) { + log.debug("Add-entry already completed for {}-{}", this.ledger != null ? this.ledger.getId() : -1, + this.entryId); + } + return false; + } + return true; + } + + /** + * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out. + * + * @param ledger + */ + void handleAddFailure(final LedgerHandle ledger) { + // If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the + // ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will + // be marked as failed. + ml.mbean.recordAddEntryError(); + + ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> { + // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock + // from a BK callback. + ml.ledgerClosed(ledger); + })); + } + private final Handle<OpAddEntry> recyclerHandle; private OpAddEntry(Handle<OpAddEntry> recyclerHandle) { @@ -220,8 +263,10 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { callback = null; ctx = null; closeWhenDone = false; + completed = FALSE; entryId = -1; startTime = -1; + lastInitTime = -1; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index cedd3b0..2397a62 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -59,7 +59,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle { long lastEntry = -1; boolean fenced = false; - PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, + public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException { super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(), EnumSet.noneOf(WriteFlag.class)); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a16dcd2..6ed13a5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -22,6 +22,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -34,8 +35,10 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.nio.charset.Charset; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -46,10 +49,12 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -57,6 +62,13 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockLedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.api.WriteFlag; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; @@ -2324,4 +2336,74 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } + + /** + * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enought + * to create new ledger and add entry successfully. + * + * + * @throws Exception + */ + @Test(timeOut = 20000) + public void testManagedLedgerWithAddEntryTimeOut() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(1); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config); + + BookKeeper bk = mock(BookKeeper.class); + doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any()); + + PulsarMockBookKeeper bkClient = mock(PulsarMockBookKeeper.class); + ClientConfiguration conf = new ClientConfiguration(); + doReturn(conf).when(bkClient).getConf(); + class MockLedgerHandle extends PulsarMockLedgerHandle { + public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) + throws GeneralSecurityException { + super(bk, id, digest, passwd); + } + + @Override + public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) { + // do nothing + } + + @Override + public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) { + cb.closeComplete(BKException.Code.OK, this, ctx); + } + } + MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class); + final String data = "data"; + doNothing().when(ledgerHandle).asyncAddEntry(data.getBytes(), null, null); + AtomicBoolean addSuccess = new AtomicBoolean(); + + setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", ledgerHandle); + + final int totalAddEntries = 1; + CountDownLatch latch = new CountDownLatch(totalAddEntries); + ledger.asyncAddEntry(data.getBytes(), new AddEntryCallback() { + + @Override + public void addComplete(Position position, Object ctx) { + addSuccess.set(true); + latch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }, null); + + latch.await(); + + assertTrue(addSuccess.get()); + + setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null); + } + + private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(classObj, fieldValue); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 9625987..048aecd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -757,14 +757,17 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "operation timeout while updating managed-ledger metadata." ) private long managedLedgerMetadataOperationsTimeoutSeconds = 60; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Read entries timeout when broker tries to read messages from bookkeeper " - + "(disable timeout by setting readTimeoutSeconds <= 0)" + + "(0 to disable it)" ) - private long managedLedgerReadEntryTimeoutSeconds = 60; + private long managedLedgerReadEntryTimeoutSeconds = 120; - + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)") + private long managedLedgerAddEntryTimeoutSeconds = 120; /*** --- Load balancer --- ****/ @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b2b5103..ff2a8dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -745,6 +745,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies managedLedgerConfig.setMetadataOperationsTimeoutSeconds( serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); + managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index e0130d2..0720d3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.net.URL; import java.util.ArrayList; @@ -290,5 +291,11 @@ public abstract class MockedPulsarServiceBaseTest { } } + public static void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(classObj, fieldValue); + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 6988863..feb5b9e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -23,9 +23,12 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -33,6 +36,7 @@ import static org.testng.Assert.fail; import java.lang.reflect.Field; import java.net.URI; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.List; @@ -46,16 +50,24 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockLedgerHandle; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -757,4 +769,66 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { } } + @Test(timeOut = 20000) + public void testAddEntryOperationTimeout() throws Exception { + + log.info("-- Starting {} test --", methodName); + + conf.setManagedLedgerAddEntryTimeoutSeconds(1); + + final String topicName = "persistent://my-property/my-ns/addEntryTimeoutTopic"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) + .subscriptionName("my-subscriber-name").subscribe(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + + class MockLedgerHandle extends PulsarMockLedgerHandle { + public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) + throws GeneralSecurityException { + super(bk, id, digest, passwd); + } + + @Override + public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) { + // do nothing + } + + @Override + public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback cb, Object ctx) { + cb.closeComplete(BKException.Code.OK, this, ctx); + } + } + MockLedgerHandle ledgerHandle = mock(MockLedgerHandle.class); + final byte[] data = "data".getBytes(); + // this will make first entry to be timed out but then managed-ledger will create a new ledger and next time add + // entry should be successful. + doNothing().when(ledgerHandle).asyncAddEntry(data, null, null); + + MockedPulsarServiceBaseTest.setFieldValue(ManagedLedgerImpl.class, ml, "currentLedger", ledgerHandle); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean addedSuccessfully = new AtomicBoolean(false); + producer.sendAsync(data).handle((res, ex) -> { + if (ex == null) { + addedSuccessfully.set(true); + } else { + log.error("add-entry failed for {}", methodName, ex); + } + latch.countDown(); + return null; + }); + latch.await(); + + // broker should be resilient enough to add-entry timeout and add entry successfully. + assertTrue(addedSuccessfully.get()); + + byte[] receivedData = consumer.receive().getData(); + assertEquals(receivedData, data); + + producer.close(); + consumer.close(); + } + } diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml index bbb3383..616873b 100644 --- a/site/_data/config/broker.yaml +++ b/site/_data/config/broker.yaml @@ -245,6 +245,9 @@ configs: - name: managedLedgerReadEntryTimeoutSeconds default: '120' description: Read entries timeout when broker tries to read messages from bookkeeper. +- name: managedLedgerAddEntryTimeoutSeconds + default: '120' + description: Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). - name: loadBalancerEnabled default: 'true' description: Enable load balancer