Repository: bookkeeper Updated Branches: refs/heads/master 6cfecea6c -> 48ab23ef3
BOOKKEEPER-868: Add ADD_ENTRY quorum timeout (Leigh Stewart via sijie) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/48ab23ef Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/48ab23ef Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/48ab23ef Branch: refs/heads/master Commit: 48ab23ef3e16f8edfda2a977f9c0fc99c441db40 Parents: 6cfecea Author: Sijie Guo <[email protected]> Authored: Tue Oct 6 01:48:46 2015 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Oct 6 01:48:46 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/bookkeeper/client/BKException.java | 11 ++ .../apache/bookkeeper/client/PendingAddOp.java | 51 ++++++- .../bookkeeper/conf/ClientConfiguration.java | 24 ++++ .../apache/bookkeeper/proto/BookieClient.java | 6 + .../client/TestAddEntryQuorumTimeout.java | 144 +++++++++++++++++++ .../test/BookKeeperClusterTestCase.java | 2 +- 7 files changed, 233 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 14a9bed..d626197 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -104,6 +104,8 @@ Trunk (unreleased changes) BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie) + BOOKKEEPER-868: Add ADD_ENTRY quorum timeout (Leigh Stewart via sijie) + bookkeeper-server: BOOKKEEPER-695: Some entry logs are not removed from the bookie storage (Matteo Merli via sijie) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index b43506d..3991085 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -94,6 +94,8 @@ public abstract class BKException extends Exception { return new BKClientClosedException(); case Code.IllegalOpException: return new BKIllegalOpException(); + case Code.AddEntryQuorumTimeoutException: + return new BKAddEntryQuorumTimeoutException(); default: return new BKUnexpectedConditionException(); } @@ -125,6 +127,7 @@ public abstract class BKException extends Exception { int MetaStoreException = -18; int ClientClosedException = -19; int LedgerExistException = -20; + int AddEntryQuorumTimeoutException = -21; int IllegalOpException = -100; int LedgerFencedException = -101; @@ -203,6 +206,8 @@ public abstract class BKException extends Exception { return "BookKeeper client is closed"; case Code.IllegalOpException: return "Invalid operation"; + case Code.AddEntryQuorumTimeoutException: + return "Add entry quorum wait timed out"; default: return "Unexpected condition"; } @@ -250,6 +255,12 @@ public abstract class BKException extends Exception { } } + public static class BKAddEntryQuorumTimeoutException extends BKException { + public BKAddEntryQuorumTimeoutException() { + super(Code.AddEntryQuorumTimeoutException); + } + } + public static class BKUnexpectedConditionException extends BKException { public BKUnexpectedConditionException() { super(Code.UnexpectedConditionException); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index fb87677..4034c35 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -27,9 +27,14 @@ import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; /** * This represents a pending add operation. When it has got success from all @@ -40,7 +45,7 @@ import org.slf4j.LoggerFactory; * * */ -class PendingAddOp implements WriteCallback { +class PendingAddOp implements WriteCallback, TimerTask { private final static Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); ChannelBuffer toSend; @@ -56,6 +61,10 @@ class PendingAddOp implements WriteCallback { LedgerHandle lh; boolean isRecoveryAdd = false; long requestTimeNanos; + + final int timeoutSec; + Timeout timeout = null; + OpStatsLogger addOpLogger; PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) { @@ -63,10 +72,9 @@ class PendingAddOp implements WriteCallback { this.cb = cb; this.ctx = ctx; this.entryId = LedgerHandle.INVALID_ENTRY_ID; - - ackSet = lh.distributionSchedule.getAckSet(); - - addOpLogger = lh.bk.getAddOpLogger(); + this.ackSet = lh.distributionSchedule.getAckSet(); + this.addOpLogger = lh.bk.getAddOpLogger(); + this.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout(); } /** @@ -90,6 +98,31 @@ class PendingAddOp implements WriteCallback { this, bookieIndex, flags); } + @Override + public void run(Timeout timeout) { + timeoutQuorumWait(); + } + + void timeoutQuorumWait() { + try { + lh.bk.mainWorkerPool.submitOrdered(lh.ledgerId, new SafeRunnable() { + @Override + public void safeRun() { + if (completed) { + return; + } + lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException); + } + @Override + public String toString() { + return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", lh.ledgerId, entryId); + } + }); + } catch (RejectedExecutionException e) { + LOG.warn("Timeout add entry quorum wait failed {} entry: {}", lh.ledgerId, entryId); + } + } + void unsetSuccessAndSendWriteRequest(int bookieIndex) { if (toSend == null) { // this addOp hasn't yet had its mac computed. When the mac is @@ -131,7 +164,10 @@ class PendingAddOp implements WriteCallback { } void initiate(ChannelBuffer toSend, int entryLength) { - requestTimeNanos = MathUtils.nowInNano(); + if (timeoutSec > -1) { + this.timeout = lh.bk.bookieClient.scheduleTimeout(this, timeoutSec, TimeUnit.SECONDS); + } + this.requestTimeNanos = MathUtils.nowInNano(); this.toSend = toSend; this.entryLength = entryLength; for (int bookieIndex : writeSet) { @@ -190,6 +226,9 @@ class PendingAddOp implements WriteCallback { } void submitCallback(final int rc) { + if (null != timeout) { + timeout.cancel(); + } long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); if (rc != BKException.Code.OK) { addOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index dde6d3a..fdbfd53 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -59,6 +59,7 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout"; // Timeout Setting protected final static String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec"; + protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec"; protected final static String READ_ENTRY_TIMEOUT_SEC = "readEntryTimeoutSec"; protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis"; protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs"; @@ -434,6 +435,29 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Get the timeout for top-level add request. That is, the amount of time we should spend + * waiting for ack quorum. + * + * @return add entry ack quorum timeout. + */ + public int getAddEntryQuorumTimeout() { + return getInt(ADD_ENTRY_QUORUM_TIMEOUT_SEC, -1); + } + + /** + * Set timeout for top-level add entry request. + * @see #getAddEntryQuorumTimeout() + * + * @param timeout + * The new add entry ack quorum timeout in seconds. + * @return client configuration. + */ + public ClientConfiguration setAddEntryQuorumTimeout(int timeout) { + setProperty(ADD_ENTRY_QUORUM_TIMEOUT_SEC, timeout); + return this; + } + + /** * Get the timeout for read entry. This is the number of seconds we wait without hearing * a response for read entry request from a bookie before we consider it failed. By default, * we use socket timeout specified at {@link #getReadTimeout()}. http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 87d1865..909cdd0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -47,6 +47,8 @@ import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,6 +250,10 @@ public class BookieClient implements PerChannelBookieClientFactory { return closed; } + public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit timeUnit) { + return requestTimer.newTimeout(task, timeoutSec, timeUnit); + } + public void close() { closeLock.writeLock().lock(); try { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java new file mode 100644 index 0000000..da62fe0 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase implements AddCallback { + + final static Logger logger = LoggerFactory.getLogger(TestAddEntryQuorumTimeout.class); + + final DigestType digestType; + final byte[] testPasswd = "".getBytes(); + + public TestAddEntryQuorumTimeout() { + super(3); + baseClientConf.setAddEntryTimeout(10); + baseClientConf.setAddEntryQuorumTimeout(1); + this.digestType = DigestType.CRC32; + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + baseConf.setZkServers(zkUtil.getZooKeeperConnectString()); + } + + private static class SyncObj { + volatile int counter = 0; + volatile int rc = -1; + public SyncObj() { + counter = 0; + } + } + + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + SyncObj x = (SyncObj) ctx; + synchronized (x) { + x.rc = rc; + x.counter++; + x.notify(); + } + } + + @Test(timeout = 60000) + public void testBasicTimeout() throws Exception { + BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf); + LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd); + List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble; + byte[] data = "foobar".getBytes(); + lh.addEntry(data); + sleepBookie(curEns.get(0), 5).await(); + try { + lh.addEntry(data); + Assert.fail("should have thrown"); + } catch (BKException.BKAddEntryQuorumTimeoutException ex) { + } + } + + private void waitForSyncObj(SyncObj syncObj) throws Exception { + synchronized (syncObj) { + while (syncObj.counter < 1) { + logger.debug("Entries counter = " + syncObj.counter); + syncObj.wait(); + } + } + } + + @Test(timeout = 60000) + public void testTimeoutWithPendingOps() throws Exception { + BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf); + LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd); + List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble; + byte[] data = "foobar".getBytes(); + + SyncObj syncObj1 = new SyncObj(); + SyncObj syncObj2 = new SyncObj(); + SyncObj syncObj3 = new SyncObj(); + + lh.addEntry(data); + sleepBookie(curEns.get(0), 5).await(); + lh.asyncAddEntry(data, this, syncObj1); + lh.asyncAddEntry(data, this, syncObj2); + lh.asyncAddEntry(data, this, syncObj3); + + waitForSyncObj(syncObj1); + Assert.assertEquals(BKException.Code.AddEntryQuorumTimeoutException, syncObj1.rc); + waitForSyncObj(syncObj2); + Assert.assertEquals(BKException.Code.AddEntryQuorumTimeoutException, syncObj2.rc); + waitForSyncObj(syncObj3); + Assert.assertEquals(BKException.Code.AddEntryQuorumTimeoutException, syncObj3.rc); + } + + @Test(timeout = 60000) + public void testLedgerClosedAfterTimeout() throws Exception { + BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf); + LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd); + List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble; + byte[] data = "foobar".getBytes(); + CountDownLatch b0latch = sleepBookie(curEns.get(0), 5); + try { + lh.addEntry(data); + Assert.fail("should have thrown"); + } catch (BKException.BKAddEntryQuorumTimeoutException ex) { + } + b0latch.await(); + try { + lh.addEntry(data); + Assert.fail("should have thrown"); + } catch (BKException.BKLedgerClosedException ex) { + } + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48ab23ef/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index ced2c9f..fce689d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -275,7 +275,7 @@ public abstract class BookKeeperClusterTestCase { * Socket Address * @param seconds * Sleep seconds - * @return Count Down latch which will be counted down when sleep finishes + * @return Count Down latch which will be counted down just after sleep begins * @throws InterruptedException * @throws IOException */
