Author: fpj
Date: Mon Dec 3 21:38:08 2012
New Revision: 1416677
URL: http://svn.apache.org/viewvc?rev=1416677&view=rev
Log:
BOOKKEEPER-399: Let hub server configure write quorum from ack quorum. (sijie
via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1416677&r1=1416676&r2=1416677&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Dec 3 21:38:08 2012
@@ -256,6 +256,8 @@ Trunk (unreleased changes)
BOOKKEEPER-389: add documentation for message filter. (sijie via ivank)
+ BOOKKEEPER-399: Let hub server configure write quorum from ack quorum.
(sijie via fpj)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via
sijie)
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1416677&r1=1416676&r2=1416677&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
Mon Dec 3 21:38:08 2012
@@ -59,7 +59,10 @@ public class ServerConfiguration extends
protected final static String INTER_REGION_SSL_ENABLED =
"inter_region_ssl_enabled";
protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL =
"messages_consumed_thread_run_interval";
protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
+ @Deprecated
protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
+ protected final static String BK_WRITE_QUORUM_SIZE =
"bk_write_quorum_size";
+ protected final static String BK_ACK_QUORUM_SIZE = "bk_ack_quorum_size";
protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL =
"retry_remote_subscribe_thread_run_interval";
protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
"default_message_window_size";
@@ -302,11 +305,38 @@ public class ServerConfiguration extends
// This parameter is used when Bookkeeper is the persistence store
// and indicates what the quorum size is (i.e. how many redundant
// copies of each ledger entry is written).
- public int getBkQuorumSize() {
+ protected int getBkQuorumSize() {
return conf.getInt(BK_QUORUM_SIZE, 2);
}
/**
+ * Get the write quorum size for BookKeeper client, which is used to
+ * indicate how many redundant copies of each ledger entry is written.
+ *
+ * @return write quorum size for BookKeeper client.
+ */
+ public int getBkWriteQuorumSize() {
+ if (conf.containsKey(BK_WRITE_QUORUM_SIZE)) {
+ return conf.getInt(BK_WRITE_QUORUM_SIZE, 2);
+ } else {
+ return getBkQuorumSize();
+ }
+ }
+
+ /**
+ * Get the ack quorum size for BookKeeper client.
+ *
+ * @return ack quorum size for BookKeeper client.
+ */
+ public int getBkAckQuorumSize() {
+ if (conf.containsKey(BK_ACK_QUORUM_SIZE)) {
+ return conf.getInt(BK_ACK_QUORUM_SIZE, 2);
+ } else {
+ return getBkQuorumSize();
+ }
+ }
+
+ /**
* This parameter is used when BookKeeper is the persistence storage,
* and indicates when the number of entries stored in a ledger reach
* the threshold, hub server will open a new ledger to write.
@@ -336,9 +366,14 @@ public class ServerConfiguration extends
}
}
// Validate that the Bookkeeper ensemble size >= quorum size.
- if (getBkEnsembleSize() < getBkQuorumSize()) {
+ if (getBkEnsembleSize() < getBkWriteQuorumSize()) {
throw new ConfigurationException("BK ensemble size (" +
getBkEnsembleSize()
- + ") is less than the quorum size
(" + getBkQuorumSize() + ")");
+ + ") is less than the write
quorum size (" + getBkWriteQuorumSize() + ")");
+ }
+
+ if (getBkWriteQuorumSize() < getBkAckQuorumSize()) {
+ throw new ConfigurationException("BK write quorum size (" +
getBkWriteQuorumSize()
+ + ") is less than the ack quorum
size (" + getBkAckQuorumSize() + ")");
}
// add other checks here
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1416677&r1=1416676&r2=1416677&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
Mon Dec 3 21:38:08 2012
@@ -1033,7 +1033,8 @@ public class BookkeeperPersistenceManage
final Version expectedVersionOfLedgersNode, final
TopicInfo topicInfo,
final long startSeqId, final boolean changeLedger,
final Callback<Void> cb, final Object ctx) {
- bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(),
DigestType.CRC32, passwd,
+ bk.asyncCreateLedger(cfg.getBkEnsembleSize(),
cfg.getBkWriteQuorumSize(),
+ cfg.getBkAckQuorumSize(), DigestType.CRC32,
passwd,
new SafeAsynBKCallback.CreateCallback() {
AtomicBoolean processed = new AtomicBoolean(false);