This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f38905e ZOOKEEPER-3437: Improve sync throttling on a learner master
f38905e is described below
commit f38905e58e4961ae04d876e1855d6ccb372a706b
Author: Jie Huang <[email protected]>
AuthorDate: Thu Jul 25 21:41:50 2019 +0200
ZOOKEEPER-3437: Improve sync throttling on a learner master
Author: Jie Huang <[email protected]>
Reviewers: Michael Han <[email protected]>, Enrico Olivelli
<[email protected]>, Fangmin Lyu <[email protected]>
Closes #995 from jhuan31/ZOOKEEPER-3437
---
.../src/main/resources/markdown/zookeeperAdmin.md | 10 +
.../zookeeper/server/quorum/FollowerBean.java | 28 +++
.../zookeeper/server/quorum/FollowerMXBean.java | 20 ++
.../org/apache/zookeeper/server/quorum/Leader.java | 31 +--
.../apache/zookeeper/server/quorum/LeaderBean.java | 20 ++
.../zookeeper/server/quorum/LeaderMXBean.java | 20 ++
.../zookeeper/server/quorum/LearnerHandler.java | 45 +++--
.../zookeeper/server/quorum/LearnerMaster.java | 121 +++++++++---
.../zookeeper/server/quorum/LearnerSnapshot.java | 44 -----
.../server/quorum/LearnerSnapshotThrottler.java | 136 -------------
.../server/quorum/LearnerSyncThrottler.java | 121 ++++++++++++
.../zookeeper/server/quorum/ObserverMaster.java | 24 +--
...leException.java => SyncThrottleException.java} | 20 +-
.../quorum/LearnerSnapshotThrottlerTest.java | 219 ---------------------
.../server/quorum/LearnerSyncThrottlerTest.java | 173 ++++++++++++++++
.../server/quorum/QuorumPeerMainTest.java | 30 +--
16 files changed, 545 insertions(+), 517 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index cb79a9f..1690ce6 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -893,6 +893,16 @@ property, when available, is noted below.
pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
Netty.
+* *maxConcurrentSnapSyncs* :
+ (Java system property: **zookeeper.leader.maxConcurrentSnapSyncs**)
+ The maximum number of snap syncs a leader or a follower can serve at the
same
+ time. The default is 10.
+
+* *maxConcurrentDiffSyncs* :
+ (Java system property: **zookeeper.leader.maxConcurrentDiffSyncs**)
+ The maximum number of diff syncs a leader or a follower can serve at the
same
+ time. The default is 100.
+
<a name="sc_clusterOptions"></a>
#### Cluster Options
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
index edfc9c7..420c694 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java
@@ -62,4 +62,32 @@ public class FollowerBean extends ZooKeeperServerBean
implements FollowerMXBean
public void setObserverMasterPacketSizeLimit(int sizeLimit) {
ObserverMaster.setPktsSizeLimit(sizeLimit);
}
+
+ @Override
+ public int getMaxConcurrentSnapSyncs() {
+ final ObserverMaster om = follower.om;
+ return om == null ? -1 : om.getMaxConcurrentSnapSyncs();
+ }
+
+ @Override
+ public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapshots) {
+ final ObserverMaster om = follower.om;
+ if (om != null) {
+ om.setMaxConcurrentSnapSyncs(maxConcurrentSnapshots);
+ }
+ }
+
+ @Override
+ public int getMaxConcurrentDiffSyncs() {
+ final ObserverMaster om = follower.om;
+ return om == null ? -1 : om.getMaxConcurrentDiffSyncs();
+ }
+
+ @Override
+ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
+ final ObserverMaster om = follower.om;
+ if (om != null) {
+ om.setMaxConcurrentDiffSyncs(maxConcurrentDiffSyncs);
+ }
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
index 6b4edd0..9875e34 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java
@@ -53,4 +53,24 @@ public interface FollowerMXBean extends
ZooKeeperServerMXBean {
* set the size limit in bytes for the observer master commit packet queue
*/
public void setObserverMasterPacketSizeLimit(int sizeLimit);
+
+ /**
+ * @return Number of concurrent snapshots permitted to send to observers
+ */
+ public int getMaxConcurrentSnapSyncs();
+
+ /**
+ * @param maxConcurrentSnapSyncs Number of concurrent snapshots permitted
to send to observers
+ */
+ public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapSyncs);
+
+ /**
+ * @return Number of concurrent diff syncs permitted to send to observers
+ */
+ public int getMaxConcurrentDiffSyncs();
+
+ /**
+ * @param maxConcurrentDiffSyncs Number of concurrent diff syncs permitted
to send to observers
+ */
+ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs);
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index a5a16f4..8d90996 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
/**
* This class has the control logic for the Leader.
*/
-public class Leader implements LearnerMaster {
+public class Leader extends LearnerMaster {
private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
static final private boolean nodelay =
System.getProperty("leader.nodelay", "true").equals("true");
@@ -87,22 +87,8 @@ public class Leader implements LearnerMaster {
}
}
- // Throttle when there are too many concurrent snapshots being sent to
observers
- private static final String MAX_CONCURRENT_SNAPSHOTS =
"zookeeper.leader.maxConcurrentSnapshots";
- private static final int maxConcurrentSnapshots;
- private static final String MAX_CONCURRENT_SNAPSHOT_TIMEOUT =
"zookeeper.leader.maxConcurrentSnapshotTimeout";
- private static final long maxConcurrentSnapshotTimeout;
- static {
- maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS,
10);
- LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
- maxConcurrentSnapshotTimeout =
Long.getLong(MAX_CONCURRENT_SNAPSHOT_TIMEOUT, 5);
- LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " +
maxConcurrentSnapshotTimeout);
- }
-
- private final LearnerSnapshotThrottler learnerSnapshotThrottler;
-
// log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0,
disable logging.
- protected static final String ACK_LOGGING_FREQUENCY =
"zookeeper.leader.ackLoggingFrequency";
+ private static final String ACK_LOGGING_FREQUENCY =
"zookeeper.leader.ackLoggingFrequency";
private static int ackLoggingFrequency;
static {
ackLoggingFrequency = Integer.getInteger(ACK_LOGGING_FREQUENCY, 1000);
@@ -137,12 +123,6 @@ public class Leader implements LearnerMaster {
return proposalStats;
}
- public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
- int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
- return new LearnerSnapshotThrottler(
- maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
- }
-
// beans for all learners
private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean>
connectionBeans = new ConcurrentHashMap<>();
@@ -324,8 +304,6 @@ public class Leader implements LearnerMaster {
throw e;
}
this.zk = zk;
- this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
- maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}
/**
@@ -1217,11 +1195,6 @@ public class Leader implements LearnerMaster {
return p;
}
- @Override
- public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
- return learnerSnapshotThrottler;
- }
-
/**
* Process sync requests
*
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
index 1c178f6..0f02d5c 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
@@ -84,4 +84,24 @@ public class LeaderBean extends ZooKeeperServerBean
implements LeaderMXBean {
public void resetProposalStatistics() {
leader.getProposalStats().reset();
}
+
+ @Override
+ public int getMaxConcurrentSnapSyncs() {
+ return leader.getMaxConcurrentSnapSyncs();
+ }
+
+ @Override
+ public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapshots) {
+ leader.setMaxConcurrentSnapSyncs(maxConcurrentSnapshots);
+ }
+
+ @Override
+ public int getMaxConcurrentDiffSyncs() {
+ return leader.getMaxConcurrentDiffSyncs();
+ }
+
+ @Override
+ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
+ leader.setMaxConcurrentDiffSyncs(maxConcurrentDiffSyncs);
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
index 4aed186..a083b0c 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
@@ -63,4 +63,24 @@ public interface LeaderMXBean extends ZooKeeperServerMXBean {
* Resets statistics of proposal size (min/max/last)
*/
public void resetProposalStatistics();
+
+ /**
+ * @return Number of concurrent snapshots permitted to send to observers
+ */
+ public int getMaxConcurrentSnapSyncs();
+
+ /**
+ * @param maxConcurrentSnapSyncs Number of concurrent snapshots permitted
to send to observers
+ */
+ public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapSyncs);
+
+ /**
+ * @return Number of concurrent diff syncs permitted to send to observers
+ */
+ public int getMaxConcurrentDiffSyncs();
+
+ /**
+ * @param maxConcurrentDiffSyncs Number of concurrent diff syncs permitted
to send to observers
+ */
+ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs);
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index c33fa04..e59ac6b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -245,6 +245,11 @@ public class LearnerHandler extends ZooKeeperThread {
*/
private long leaderLastZxid;
+ /**
+ * for sync throttling
+ */
+ private LearnerSyncThrottler syncThrottler = null;
+
LearnerHandler(Socket sock, BufferedInputStream bufferedInput,
LearnerMaster learnerMaster) throws IOException {
super("LearnerHandler-" + sock.getRemoteSocketAddress());
this.sock = sock;
@@ -535,34 +540,38 @@ public class LearnerHandler extends ZooKeeperThread {
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
+ // syncs between followers and the leader are exempt from
throttling because it
+ // is importatnt to keep the state of quorum servers up-to-date.
The exempted syncs
+ // are counted as concurrent syncs though
+ boolean exemptFromThrottle = getLearnerType() !=
LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot
*/
if (needSnap) {
- boolean exemptFromThrottle = getLearnerType() !=
LearnerType.OBSERVER;
- LearnerSnapshot snapshot =
-
learnerMaster.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
+ syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
+ syncThrottler.beginSync(exemptFromThrottle);
try {
long zxidToSend =
learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend,
null, null), "packet");
bufferedOutput.flush();
LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid
of leader is 0x{}, "
- + "send zxid of db as 0x{}, {} concurrent
snapshots, "
- + "snapshot was {} from throttle",
+ + "send zxid of db as 0x{}, {} concurrent snapshot
sync, "
+ + "snapshot sync was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
- snapshot.getConcurrentSnapshotNumber(),
- snapshot.isEssential() ? "exempt" : "not exempt");
+ syncThrottler.getSyncInProgress(),
+ exemptFromThrottle ? "exempt" : "not exempt");
// Dump data to peer
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
- snapshot.close();
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
}
else {
+ syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
+ syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
}
@@ -603,6 +612,9 @@ public class LearnerHandler extends ZooKeeperThread {
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
+ // sync ends when NEWLEADER-ACK is received
+ syncThrottler.endSync();
+ syncThrottler = null;
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(learnerMaster.syncTimeout());
@@ -698,10 +710,18 @@ public class LearnerHandler extends ZooKeeperThread {
}
}
} catch (InterruptedException e) {
- LOG.error("Unexpected exception causing shutdown", e);
- } catch (SnapshotThrottleException e) {
- LOG.error("too many concurrent snapshots: " + e);
+ LOG.error("Unexpected exception in LearnerHandler: ", e);
+ } catch (SyncThrottleException e) {
+ LOG.error("too many concurrent syncs: " + e);
+ syncThrottler = null;
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in LearnerHandler: ", e);
+ throw e;
} finally {
+ if (syncThrottler != null) {
+ syncThrottler.endSync();
+ syncThrottler = null;
+ }
LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
shutdown();
}
@@ -772,7 +792,7 @@ public class LearnerHandler extends ZooKeeperThread {
long minCommittedLog = db.getminCommittedLog();
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
- LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
+ LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{}", getSid(),
Long.toHexString(maxCommittedLog),
@@ -1013,6 +1033,7 @@ public class LearnerHandler extends ZooKeeperThread {
public void shutdown() {
// Send the packet of death
try {
+ queuedPackets.clear();
queuedPackets.put(proposalOfDeath);
} catch (InterruptedException e) {
LOG.warn("Ignoring unexpected exception", e);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
index 3bffacf..db866c1 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java
@@ -21,6 +21,8 @@ package org.apache.zookeeper.server.quorum;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.Socket;
@@ -29,18 +31,81 @@ import java.net.SocketAddress;
/**
* interface for keeping Observers in sync
*/
-public interface LearnerMaster {
+public abstract class LearnerMaster {
+ private static final Logger LOG =
LoggerFactory.getLogger(LearnerMaster.class);
+
+ // Throttle when there are too many concurrent snapshots being sent to
observers
+ private static final String MAX_CONCURRENT_SNAPSYNCS =
"zookeeper.leader.maxConcurrentSnapSyncs";
+ private static final int DEFAULT_CONCURRENT_SNAPSYNCS;
+
+ // Throttle when there are too many concurrent diff syncs being sent to
observers
+ private static final String MAX_CONCURRENT_DIFF_SYNCS =
"zookeeper.leader.maxConcurrentDiffSyncs";
+ private static final int DEFAULT_CONCURRENT_DIFF_SYNCS;
+
+ static {
+ DEFAULT_CONCURRENT_SNAPSYNCS =
Integer.getInteger(MAX_CONCURRENT_SNAPSYNCS, 10);
+ LOG.info(MAX_CONCURRENT_SNAPSYNCS + " = " +
DEFAULT_CONCURRENT_SNAPSYNCS);
+
+ DEFAULT_CONCURRENT_DIFF_SYNCS =
Integer.getInteger(MAX_CONCURRENT_DIFF_SYNCS, 100);
+ LOG.info(MAX_CONCURRENT_DIFF_SYNCS + " = " +
DEFAULT_CONCURRENT_DIFF_SYNCS);
+ }
+
+ private volatile int maxConcurrentSnapSyncs = DEFAULT_CONCURRENT_SNAPSYNCS;
+ private volatile int maxConcurrentDiffSyncs =
DEFAULT_CONCURRENT_DIFF_SYNCS;
+
+ private final LearnerSyncThrottler learnerSnapSyncThrottler =
+ new LearnerSyncThrottler(maxConcurrentSnapSyncs,
LearnerSyncThrottler.SyncType.SNAP);
+
+ private final LearnerSyncThrottler learnerDiffSyncThrottler =
+ new
LearnerSyncThrottler(maxConcurrentDiffSyncs,LearnerSyncThrottler.SyncType.DIFF);
+
+ public int getMaxConcurrentSnapSyncs() {
+ return maxConcurrentSnapSyncs;
+ }
+
+ public void setMaxConcurrentSnapSyncs(int maxConcurrentSnapSyncs) {
+ LOG.info("Set maxConcurrentSnapSyncs to {}", maxConcurrentSnapSyncs);
+ this.maxConcurrentSnapSyncs = maxConcurrentSnapSyncs;
+ learnerSnapSyncThrottler.setMaxConcurrentSyncs(maxConcurrentSnapSyncs);
+ }
+
+ public int getMaxConcurrentDiffSyncs() {
+ return maxConcurrentDiffSyncs;
+ }
+
+ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
+ LOG.info("Set maxConcurrentDiffSyncs to {}", maxConcurrentDiffSyncs);
+ this.maxConcurrentDiffSyncs = maxConcurrentDiffSyncs;
+ learnerDiffSyncThrottler.setMaxConcurrentSyncs(maxConcurrentDiffSyncs);
+ }
+
+ /**
+ * snap sync throttler
+ * @return snapshot throttler
+ */
+ public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
+ return learnerSnapSyncThrottler;
+ }
+
+ /**
+ * diff sync throttler
+ * @return diff throttler
+ */
+ public LearnerSyncThrottler getLearnerDiffSyncThrottler() {
+ return learnerDiffSyncThrottler;
+ }
+
/**
* start tracking a learner handler
* @param learnerHandler to track
*/
- void addLearnerHandler(LearnerHandler learnerHandler);
+ abstract void addLearnerHandler(LearnerHandler learnerHandler);
/**
* stop tracking a learner handler
* @param learnerHandler to drop
*/
- void removeLearnerHandler(LearnerHandler learnerHandler);
+ abstract void removeLearnerHandler(LearnerHandler learnerHandler);
/**
* wait for the leader of the new epoch to be confirmed by followers
@@ -49,19 +114,13 @@ public interface LearnerMaster {
* @throws IOException
* @throws InterruptedException
*/
- void waitForEpochAck(long sid, StateSummary ss) throws IOException,
InterruptedException;
-
- /**
- * snapshot throttler
- * @return snapshot throttler
- */
- LearnerSnapshotThrottler getLearnerSnapshotThrottler();
+ abstract void waitForEpochAck(long sid, StateSummary ss) throws
IOException, InterruptedException;
/**
* wait for server to start
* @throws InterruptedException
*/
- void waitForStartup() throws InterruptedException;
+ abstract void waitForStartup() throws InterruptedException;
/**
* get the first zxid of the next epoch
@@ -71,13 +130,13 @@ public interface LearnerMaster {
* @throws InterruptedException
* @throws IOException
*/
- long getEpochToPropose(long sid, long lastAcceptedEpoch) throws
InterruptedException, IOException;
+ abstract long getEpochToPropose(long sid, long lastAcceptedEpoch) throws
InterruptedException, IOException;
/**
* ZKDatabase
* @return ZKDatabase
*/
- ZKDatabase getZKDatabase();
+ abstract ZKDatabase getZKDatabase();
/**
* wait for new leader to settle
@@ -85,43 +144,43 @@ public interface LearnerMaster {
* @param zxid zxid at learner
* @throws InterruptedException
*/
- void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException;
+ abstract void waitForNewLeaderAck(long sid, long zxid) throws
InterruptedException;
/**
* last proposed zxid
* @return last proposed zxid
*/
- long getLastProposed();
+ abstract long getLastProposed();
/**
* the current tick
* @return the current tick
*/
- int getCurrentTick();
+ abstract int getCurrentTick();
/**
* time allowed for sync response
* @return time allowed for sync response
*/
- int syncTimeout();
+ abstract int syncTimeout();
/**
* deadline tick marking observer sync (initial)
* @return deadline tick marking observer sync (initial)
*/
- int getTickOfNextAckDeadline();
+ abstract int getTickOfNextAckDeadline();
/**
* next deadline tick marking observer sync (steady state)
* @return next deadline tick marking observer sync (steady state)
*/
- int getTickOfInitialAckDeadline();
+ abstract int getTickOfInitialAckDeadline();
/**
* decrement follower count
* @return previous follower count
*/
- long getAndDecrementFollowerCounter();
+ abstract long getAndDecrementFollowerCounter();
/**
* handle ack packet
@@ -129,14 +188,14 @@ public interface LearnerMaster {
* @param zxid packet zxid
* @param localSocketAddress forwarder's address
*/
- void processAck(long sid, long zxid, SocketAddress localSocketAddress);
+ abstract void processAck(long sid, long zxid, SocketAddress
localSocketAddress);
/**
* mark session as alive
* @param sess session id
* @param to timeout
*/
- void touch(long sess, int to);
+ abstract void touch(long sess, int to);
/**
* handle revalidate packet
@@ -144,13 +203,13 @@ public interface LearnerMaster {
* @param learnerHandler learner
* @throws IOException
*/
- void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler)
throws IOException;
+ abstract void revalidateSession(QuorumPacket qp, LearnerHandler
learnerHandler) throws IOException;
/**
* proxy request from learner to server
* @param si request
*/
- void submitLearnerRequest(Request si);
+ abstract void submitLearnerRequest(Request si);
/**
* begin forwarding packets to learner handler
@@ -158,39 +217,39 @@ public interface LearnerMaster {
* @param lastSeenZxid zxid of learner
* @return last zxid forwarded
*/
- long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid);
+ abstract long startForwarding(LearnerHandler learnerHandler, long
lastSeenZxid);
/**
* version of current quorum verifier
* @return version of current quorum verifier
*/
- long getQuorumVerifierVersion();
+ abstract long getQuorumVerifierVersion();
/**
*
* @param sid server id
* @return server information in the view
*/
- String getPeerInfo(long sid);
+ abstract String getPeerInfo(long sid);
/**
* identifier of current quorum verifier for new leader
* @return identifier of current quorum verifier for new leader
*/
- byte[] getQuorumVerifierBytes();
+ abstract byte[] getQuorumVerifierBytes();
- QuorumAuthServer getQuorumAuthServer();
+ abstract QuorumAuthServer getQuorumAuthServer();
/**
* registers the handler's bean
* @param learnerHandler handler
* @param socket connection to learner
*/
- void registerLearnerHandlerBean(final LearnerHandler learnerHandler,
Socket socket);
+ abstract void registerLearnerHandlerBean(final LearnerHandler
learnerHandler, Socket socket);
/**
* unregisters the handler's bean
* @param learnerHandler handler
*/
- void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler);
+ abstract void unregisterLearnerHandlerBean(final LearnerHandler
learnerHandler);
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
deleted file mode 100644
index 0835c44..0000000
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-public class LearnerSnapshot {
- private final LearnerSnapshotThrottler throttler;
- private final int concurrentSnapshotNumber;
- private final boolean essential;
-
- LearnerSnapshot(LearnerSnapshotThrottler throttler,
- int concurrentSnapshotNumber, boolean essential) {
- this.throttler = throttler;
- this.concurrentSnapshotNumber = concurrentSnapshotNumber;
- this.essential = essential;
- }
-
- public void close() {
- throttler.endSnapshot();
- }
-
- public int getConcurrentSnapshotNumber() {
- return concurrentSnapshotNumber;
- }
-
- public boolean isEssential() {
- return essential;
- }
-}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
deleted file mode 100644
index 3542234..0000000
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.common.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class to limit the number of concurrent snapshots from a leader to
- * observers and followers. {@link LearnerHandler} objects should call
- * {@link #beginSnapshot(boolean)} before sending a snapshot and
- * {@link #endSnapshot()} after finishing, successfully or not.
- *
- */
-public class LearnerSnapshotThrottler {
- private static final Logger LOG =
- LoggerFactory.getLogger(LearnerSnapshotThrottler.class);
-
- private final Object snapCountSyncObject = new Object();
- private int snapsInProgress;
-
- private final int maxConcurrentSnapshots;
- private final long timeoutMillis;
-
- /**
- * Constructs a new instance limiting the concurrent number of snapshots to
- * <code>maxConcurrentSnapshots</code>.
- * @param maxConcurrentSnapshots maximum concurrent number of snapshots
- * @param timeoutMillis milliseconds to attempt to wait when attempting to
- * begin a snapshot that would otherwise be throttled;
- * a value of zero means no waiting will be attempted
- * @throws java.lang.IllegalArgumentException when
<code>timeoutMillis</code>
- * is negative or
- *
<code>maxConcurrentSnaphots</code>
- * is less than 1
- */
- public LearnerSnapshotThrottler(int maxConcurrentSnapshots,
- long timeoutMillis) {
- if (timeoutMillis < 0) {
- String errorMsg = "timeout cannot be negative, was " +
timeoutMillis;
- throw new IllegalArgumentException(errorMsg);
- }
- if (maxConcurrentSnapshots <= 0) {
- String errorMsg = "maxConcurrentSnapshots must be positive, was " +
- maxConcurrentSnapshots;
- throw new IllegalArgumentException(errorMsg);
- }
-
- this.maxConcurrentSnapshots = maxConcurrentSnapshots;
- this.timeoutMillis = timeoutMillis;
-
- synchronized (snapCountSyncObject) {
- snapsInProgress = 0;
- }
- }
-
- public LearnerSnapshotThrottler(int maxConcurrentSnapshots) {
- this(maxConcurrentSnapshots, 0);
- }
-
- /**
- * Indicates that a new snapshot is about to be sent.
- *
- * @param essential if <code>true</code>, do not throw an exception even
- * if throttling limit is reached
- * @throws SnapshotThrottleException if throttling limit has been exceeded
- * and <code>essential == false</code>,
- * even after waiting for the timeout
- * period, if any
- * @throws InterruptedException if thread is interrupted while trying
- * to start a snapshot; cannot happen if
- * timeout is zero
- */
- public LearnerSnapshot beginSnapshot(boolean essential)
- throws SnapshotThrottleException, InterruptedException {
- int snapshotNumber;
-
- synchronized (snapCountSyncObject) {
- if (!essential
- && timeoutMillis > 0
- && snapsInProgress >= maxConcurrentSnapshots) {
- long timestamp = Time.currentElapsedTime();
- do {
- snapCountSyncObject.wait(timeoutMillis);
- } while (snapsInProgress >= maxConcurrentSnapshots
- && timestamp + timeoutMillis <
Time.currentElapsedTime());
- }
-
- if (essential || snapsInProgress < maxConcurrentSnapshots) {
- snapsInProgress++;
- snapshotNumber = snapsInProgress;
- } else {
- throw new SnapshotThrottleException(snapsInProgress + 1,
- maxConcurrentSnapshots);
- }
- }
-
- return new LearnerSnapshot(this, snapshotNumber, essential);
- }
-
- /**
- * Indicates that a snapshot has been completed.
- */
- public void endSnapshot() {
- int newCount;
- synchronized (snapCountSyncObject) {
- snapsInProgress--;
- newCount = snapsInProgress;
- snapCountSyncObject.notify();
- }
-
- if (newCount < 0) {
- String errorMsg =
- "endSnapshot() called incorrectly; current snapshot count
is "
- + newCount;
- LOG.error(errorMsg);
- }
- }
-}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottler.java
new file mode 100644
index 0000000..57a8c4f
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to limit the number of concurrent syncs from a leader to
+ * observers and followers orß from a follower to observers. {@link
LearnerHandler}
+ * objects should call {@link #beginSync(boolean)} before sending a sync and
+ * {@link #endSync()} after finishing, successfully or not.
+ *
+ */
+public class LearnerSyncThrottler {
+ private static final Logger LOG =
LoggerFactory.getLogger(LearnerSyncThrottler.class);
+
+ private final Object countSyncObject = new Object();
+ private int syncInProgress;
+
+ private volatile int maxConcurrentSyncs;
+
+ public enum SyncType {
+ DIFF,
+ SNAP
+ }
+
+ private final SyncType syncType;
+
+ /**
+ * Constructs a new instance limiting the concurrent number of syncs to
+ * <code>maxConcurrentSyncs</code>.
+ * @param maxConcurrentSyncs maximum concurrent number of syncs
+ * @param syncType either a snapshot sync or a txn-based diff sync
+ * @throws java.lang.IllegalArgumentException when
<code>maxConcurrentSyncs</code>
+ * is less than 1
+ */
+ public LearnerSyncThrottler(int maxConcurrentSyncs, SyncType syncType)
throws IllegalArgumentException {
+ if (maxConcurrentSyncs <= 0) {
+ String errorMsg = "maxConcurrentSyncs must be positive, was " +
+ maxConcurrentSyncs;
+ throw new IllegalArgumentException(errorMsg);
+ }
+
+ this.maxConcurrentSyncs = maxConcurrentSyncs;
+ this.syncType = syncType;
+
+ synchronized (countSyncObject) {
+ syncInProgress = 0;
+ }
+ }
+
+ /**
+ * Indicates that a new sync is about to be sent.
+ *
+ * @param essential if <code>true</code>, do not throw an exception even
+ * if throttling limit is reached
+ * @throws SyncThrottleException if throttling limit has been exceeded
+ * and <code>essential == false</code>,
+ * even after waiting for the timeout
+ * period, if any
+ * @throws InterruptedException if thread is interrupted while trying
+ * to start a sync; cannot happen if
+ * timeout is zero
+ */
+ protected void beginSync(boolean essential) throws SyncThrottleException,
InterruptedException {
+
+ synchronized (countSyncObject) {
+ if (essential || syncInProgress < maxConcurrentSyncs) {
+ syncInProgress++;
+ } else {
+ throw new SyncThrottleException(syncInProgress + 1,
+ maxConcurrentSyncs, syncType);
+ }
+ }
+ }
+
+ /**
+ * Indicates that a sync has been completed.
+ */
+ public void endSync() {
+ int newCount;
+ synchronized (countSyncObject) {
+ syncInProgress--;
+ newCount = syncInProgress;
+ countSyncObject.notify();
+ }
+
+ if (newCount < 0) {
+ String errorMsg =
+ "endSync() called incorrectly; current sync count is "
+ + newCount;
+ LOG.error(errorMsg);
+ }
+ }
+
+ public void setMaxConcurrentSyncs(int maxConcurrentSyncs) {
+ this.maxConcurrentSyncs = maxConcurrentSyncs;
+ }
+
+ public int getSyncInProgress(){
+ return syncInProgress;
+ }
+}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
index 368f5ef..98f1acd 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
@@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory;
*
* The logic is quite a bit simpler than the corresponding logic in Leader
because it only hosts observers.
*/
-public class ObserverMaster implements LearnerMaster, Runnable {
+public class ObserverMaster extends LearnerMaster implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(ObserverMaster.class);
//Follower counter
@@ -93,20 +93,6 @@ public class ObserverMaster implements LearnerMaster,
Runnable {
// ensure ordering of revalidations returned to this learner
private final Object revalidateSessionLock = new Object();
- // Throttle when there are too many concurrent snapshots being sent to
observers
- private static final String MAX_CONCURRENT_SNAPSHOTS =
"zookeeper.leader.maxConcurrentSnapshots";
- private static final int maxConcurrentSnapshots;
-
- private static final String MAX_CONCURRENT_DIFFS =
"zookeeper.leader.maxConcurrentDiffs";
- private static final int maxConcurrentDiffs;
- static {
- maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS,
10);
- LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
-
- maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
- LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
- }
-
private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations =
new ConcurrentLinkedQueue<>();
static class Revalidation {
public final long sessionId;
@@ -137,9 +123,6 @@ public class ObserverMaster implements LearnerMaster,
Runnable {
}
}
- private final LearnerSnapshotThrottler learnerSnapshotThrottler =
- new LearnerSnapshotThrottler(maxConcurrentSnapshots);
-
private Thread thread;
private ServerSocket ss;
private boolean listenerRunning;
@@ -198,11 +181,6 @@ public class ObserverMaster implements LearnerMaster,
Runnable {
}
@Override
- public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
- return learnerSnapshotThrottler;
- }
-
- @Override
public void waitForStartup() throws InterruptedException {
// since this is done by an active follower, we don't need to wait for
anything
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SyncThrottleException.java
similarity index 55%
rename from
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
rename to
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SyncThrottleException.java
index 77732f3..d5d6bd3 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SyncThrottleException.java
@@ -19,21 +19,23 @@
package org.apache.zookeeper.server.quorum;
/**
- * Thrown when a {@link Leader} has too many concurrent snapshots being sent
+ * Thrown when a {@link Leader} has too many concurrent syncs being sent
* to observers.
- *
- * @see LearnerSnapshotThrottler
+ *
+ * @see LearnerSyncThrottler
*
*/
-public class SnapshotThrottleException extends Exception {
+public class SyncThrottleException extends Exception {
private static final long serialVersionUID = 1L;
- public SnapshotThrottleException(int concurrentSnapshotNumber, int
throttleThreshold) {
- super(getMessage(concurrentSnapshotNumber, throttleThreshold));
+ public SyncThrottleException(int concurrentSyncNumber, int
throttleThreshold,
+ LearnerSyncThrottler.SyncType syncType) {
+ super(getMessage(concurrentSyncNumber, throttleThreshold, syncType));
}
- private static String getMessage(int concurrentSnapshotNumber, int
throttleThreshold) {
- return String.format("new snapshot would make %d concurrently in
progress; " +
- "maximum is %d", concurrentSnapshotNumber, throttleThreshold);
+ private static String getMessage(int concurrentSyncNumber, int
throttleThreshold,
+ LearnerSyncThrottler.SyncType syncType) {
+ return String.format("new %s sync would make %d concurrently in
progress; maximum is %d",
+ syncType.toString().toLowerCase(), concurrentSyncNumber,
throttleThreshold);
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
deleted file mode 100644
index c2d65e3..0000000
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LearnerSnapshotThrottlerTest extends ZKTestCase {
- private static final Logger LOG =
- LoggerFactory.getLogger(LearnerSnapshotThrottlerTest.class);
-
- @Test(expected = SnapshotThrottleException.class)
- public void testTooManySnapshotsNonessential() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
- for (int i = 0; i < 6; i++) {
- throttler.beginSnapshot(false);
- }
- }
-
- @Test(expected = SnapshotThrottleException.class)
- public void testTooManySnapshotsEssential() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
- try {
- for (int i = 0; i < 6; i++) {
- throttler.beginSnapshot(true);
- }
- }
- catch (SnapshotThrottleException ex) {
- Assert.fail("essential snapshots should not be throttled");
- }
- throttler.endSnapshot();
- throttler.beginSnapshot(false);
- }
-
- @Test
- public void testNoThrottle() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
- try {
- for (int i = 0; i < 6; i++) {
- throttler.beginSnapshot(true);
- }
- }
- catch (SnapshotThrottleException ex) {
- Assert.fail("essential snapshots should not be throttled");
- }
- throttler.endSnapshot();
- for (int i = 0; i < 5; i++) {
- throttler.endSnapshot();
- throttler.beginSnapshot(false);
- }
- }
-
- @Test
- public void testTryWithResourceNoThrottle() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
- for (int i = 0; i < 3; i++) {
- LearnerSnapshot snapshot = throttler.beginSnapshot(false);
- try {
- Assert.assertFalse(snapshot.isEssential());
- Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
- } finally {
- snapshot.close();
- }
- }
- }
-
- @Test(expected = SnapshotThrottleException.class)
- public void testTryWithResourceThrottle() throws Exception {
- LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
- LearnerSnapshot outer = throttler.beginSnapshot(true);
- try {
- LearnerSnapshot inner = throttler.beginSnapshot(false);
- try {
- Assert.fail("shouldn't be able to have both snapshots open");
- } finally {
- inner.close();
- }
- } finally {
- outer.close();
- }
- }
-
- @Test
- public void testParallelNoThrottle() throws Exception {
- final int numThreads = 50;
-
- final LearnerSnapshotThrottler throttler = new
LearnerSnapshotThrottler(numThreads);
- ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
- final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
- final CountDownLatch snapshotProgressLatch = new
CountDownLatch(numThreads);
-
- List<Future<Boolean>> results = new
ArrayList<Future<Boolean>>(numThreads);
- for (int i = 0; i < numThreads; i++) {
- results.add(threadPool.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() {
- threadStartLatch.countDown();
- try {
- threadStartLatch.await();
-
- throttler.beginSnapshot(false);
-
- snapshotProgressLatch.countDown();
- snapshotProgressLatch.await();
-
- throttler.endSnapshot();
- }
- catch (Exception e) {
- return false;
- }
-
- return true;
- }
- }));
- }
-
- for (Future<Boolean> result : results) {
- Assert.assertTrue(result.get());
- }
- }
-
- @Test
- public void testPositiveTimeout() throws Exception {
- final LearnerSnapshotThrottler throttler = new
LearnerSnapshotThrottler(1, 200);
- ExecutorService threadPool = Executors.newFixedThreadPool(1);
-
- LearnerSnapshot first = throttler.beginSnapshot(false);
- final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
-
- Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() {
- try {
- snapshotProgressLatch.countDown();
- LearnerSnapshot second = throttler.beginSnapshot(false);
- second.close();
- }
- catch (Exception e) {
- return false;
- }
-
- return true;
- }
- });
-
- snapshotProgressLatch.await();
-
- first.close();
-
- Assert.assertTrue(result.get());
- }
-
- @Test
- public void testHighContentionWithTimeout() throws Exception {
- int numThreads = 20;
-
- final LearnerSnapshotThrottler throttler = new
LearnerSnapshotThrottler(2, 5000);
- ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
- final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
-
- List<Future<Boolean>> results = new
ArrayList<Future<Boolean>>(numThreads);
- for (int i = 0; i < numThreads; i++) {
- results.add(threadPool.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() {
- threadStartLatch.countDown();
- try {
- threadStartLatch.await();
-
- LearnerSnapshot snap = throttler.beginSnapshot(false);
-
- int snapshotNumber =
snap.getConcurrentSnapshotNumber();
-
- throttler.endSnapshot();
-
- return snapshotNumber <= 2;
- }
- catch (Exception e) {
- LOG.error("Exception trying to begin snapshot", e);
- return false;
- }
- }
- }));
- }
-
- for (Future<Boolean> result : results) {
- Assert.assertTrue(result.get());
- }
- }
-}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottlerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottlerTest.java
new file mode 100644
index 0000000..d3ce47b
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerSyncThrottlerTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class LearnerSyncThrottlerTest extends ZKTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(LearnerSyncThrottlerTest.class);
+
+ private LearnerSyncThrottler.SyncType syncType;
+ public LearnerSyncThrottlerTest(LearnerSyncThrottler.SyncType syncType){
+ this.syncType = syncType;
+ }
+
+ @Parameterized.Parameters
+ public static Collection syncTypes() {
+ return Arrays.asList(new Object[][]{
+ {LearnerSyncThrottler.SyncType.DIFF},
{LearnerSyncThrottler.SyncType.SNAP}});
+ }
+ @Test(expected = SyncThrottleException.class)
+ public void testTooManySyncsNonessential() throws Exception {
+ LearnerSyncThrottler throttler =
+ new LearnerSyncThrottler(5, syncType);
+ for (int i = 0; i < 6; i++) {
+ throttler.beginSync(false);
+ }
+ }
+
+ @Test(expected = SyncThrottleException.class)
+ public void testTooManySyncsEssential() throws Exception {
+ LearnerSyncThrottler throttler =
+ new LearnerSyncThrottler(5, syncType);
+ try {
+ for (int i = 0; i < 6; i++) {
+ throttler.beginSync(true);
+ }
+ } catch (SyncThrottleException ex) {
+ Assert.fail("essential syncs should not be throttled");
+ }
+ throttler.endSync();
+ throttler.beginSync(false);
+ }
+
+ @Test
+ public void testNoThrottle() throws Exception {
+ LearnerSyncThrottler throttler =
+ new LearnerSyncThrottler(5, syncType);
+ try {
+ for (int i = 0; i < 6; i++) {
+ throttler.beginSync(true);
+ }
+ }
+ catch (SyncThrottleException ex) {
+ Assert.fail("essential syncs should not be throttled");
+ }
+ throttler.endSync();
+ for (int i = 0; i < 5; i++) {
+ throttler.endSync();
+ throttler.beginSync(false);
+ }
+ Assert.assertTrue("should get here without exception", true);
+ }
+
+ @Test
+ public void testTryWithResourceNoThrottle() throws Exception {
+ LearnerSyncThrottler throttler =
+ new LearnerSyncThrottler(1, syncType);
+ for (int i = 0; i < 3; i++) {
+ throttler.beginSync(false);
+ try {
+ Assert.assertEquals(1, throttler.getSyncInProgress());
+ } finally {
+ throttler.endSync();
+ }
+ }
+ }
+
+ @Test
+ public void testTryWithResourceThrottle() throws Exception {
+ LearnerSyncThrottler throttler =
+ new LearnerSyncThrottler(1, syncType);
+ try {
+ throttler.beginSync(true);
+ try {
+ throttler.beginSync(false);
+ Assert.fail("shouldn't be able to have both syncs open");
+ } catch (SyncThrottleException e) {
+ }
+ throttler.endSync();
+ } catch (SyncThrottleException e) {
+ Assert.fail("First sync shouldn't be throttled");
+ }
+ }
+
+ @Test
+ public void testParallelNoThrottle() {
+ final int numThreads = 50;
+
+ final LearnerSyncThrottler throttler =
+ new LearnerSyncThrottler(numThreads, syncType);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+ final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+ final CountDownLatch syncProgressLatch = new
CountDownLatch(numThreads);
+
+ List<Future<Boolean>> results = new
ArrayList<Future<Boolean>>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ results.add(threadPool.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() {
+ threadStartLatch.countDown();
+ try {
+ threadStartLatch.await();
+
+ throttler.beginSync(false);
+
+ syncProgressLatch.countDown();
+ syncProgressLatch.await();
+
+ throttler.endSync();
+ } catch (Exception e) {
+ return false;
+ }
+
+ return true;
+ }
+ }));
+ }
+
+ try {
+ for (Future<Boolean> result : results) {
+ Assert.assertTrue(result.get());
+ }
+ } catch (Exception e){
+
+ } finally {
+ threadPool.shutdown();
+ }
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 9c4d276..a0bbc09 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -1203,7 +1203,7 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
});
// 5. on the customized leader catch the beginSnapshot call in
- // LearnerSnapshotThrottler to set the node to value v2,
+ // LearnerSyncThrottler to set the node to value v2,
// wait it hit data tree
leaderQuorumPeer.setBeginSnapshotListener(new
BeginSnapshotListener() {
@Override
@@ -1756,6 +1756,7 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
static class CustomQuorumPeer extends QuorumPeer {
private Context context;
+ private LearnerSyncThrottler throttler = null;
private StartForwardingListener startForwardingListener;
private BeginSnapshotListener beginSnapshotListener;
@@ -1818,20 +1819,21 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
}
@Override
- public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
- int maxConcurrentSnapshots, long
maxConcurrentSnapshotTimeout) {
- return new LearnerSnapshotThrottler(
- maxConcurrentSnapshots,
maxConcurrentSnapshotTimeout) {
-
- @Override
- public LearnerSnapshot beginSnapshot(boolean essential)
- throws SnapshotThrottleException,
InterruptedException {
- if (beginSnapshotListener != null) {
- beginSnapshotListener.start();
+ public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
+ if (throttler == null){
+ throttler = new
LearnerSyncThrottler(getMaxConcurrentSnapSyncs(),
+ LearnerSyncThrottler.SyncType.SNAP){
+ @Override
+ public void beginSync(boolean essential)
+ throws SyncThrottleException,
InterruptedException {
+ if (beginSnapshotListener != null) {
+ beginSnapshotListener.start();
+ }
+ super.beginSync(essential);
}
- return super.beginSnapshot(essential);
- }
- };
+ };
+ }
+ return throttler;
}
};
}