Author: fpj
Date: Mon Dec  3 21:38:08 2012
New Revision: 1416677

URL: http://svn.apache.org/viewvc?rev=1416677&view=rev
Log:
BOOKKEEPER-399: Let hub server configure write quorum from ack quorum. (sijie 
via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1416677&r1=1416676&r2=1416677&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Dec  3 21:38:08 2012
@@ -256,6 +256,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-389: add documentation for message filter. (sijie via ivank)
 
+       BOOKKEEPER-399: Let hub server configure write quorum from ack quorum. 
(sijie via fpj)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via 
sijie)

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1416677&r1=1416676&r2=1416677&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
 Mon Dec  3 21:38:08 2012
@@ -59,7 +59,10 @@ public class ServerConfiguration extends
     protected final static String INTER_REGION_SSL_ENABLED = 
"inter_region_ssl_enabled";
     protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = 
"messages_consumed_thread_run_interval";
     protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
+    @Deprecated
     protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
+    protected final static String BK_WRITE_QUORUM_SIZE = 
"bk_write_quorum_size";
+    protected final static String BK_ACK_QUORUM_SIZE = "bk_ack_quorum_size";
     protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = 
"retry_remote_subscribe_thread_run_interval";
     protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
         "default_message_window_size";
@@ -302,11 +305,38 @@ public class ServerConfiguration extends
     // This parameter is used when Bookkeeper is the persistence store
     // and indicates what the quorum size is (i.e. how many redundant
     // copies of each ledger entry is written).
-    public int getBkQuorumSize() {
+    protected int getBkQuorumSize() {
         return conf.getInt(BK_QUORUM_SIZE, 2);
     }
 
     /**
+     * Get the write quorum size for BookKeeper client, which is used to
+     * indicate how many redundant copies of each ledger entry is written.
+     *
+     * @return write quorum size for BookKeeper client.
+     */
+    public int getBkWriteQuorumSize() {
+        if (conf.containsKey(BK_WRITE_QUORUM_SIZE)) {
+            return conf.getInt(BK_WRITE_QUORUM_SIZE, 2);
+        } else {
+            return getBkQuorumSize();
+        }
+    }
+
+    /**
+     * Get the ack quorum size for BookKeeper client.
+     *
+     * @return ack quorum size for BookKeeper client.
+     */
+    public int getBkAckQuorumSize() {
+        if (conf.containsKey(BK_ACK_QUORUM_SIZE)) {
+            return conf.getInt(BK_ACK_QUORUM_SIZE, 2);
+        } else {
+            return getBkQuorumSize();
+        }
+    }
+
+    /**
      * This parameter is used when BookKeeper is the persistence storage,
      * and indicates when the number of entries stored in a ledger reach
      * the threshold, hub server will open a new ledger to write.
@@ -336,9 +366,14 @@ public class ServerConfiguration extends
             }
         }
         // Validate that the Bookkeeper ensemble size >= quorum size.
-        if (getBkEnsembleSize() < getBkQuorumSize()) {
+        if (getBkEnsembleSize() < getBkWriteQuorumSize()) {
             throw new ConfigurationException("BK ensemble size (" + 
getBkEnsembleSize()
-                                             + ") is less than the quorum size 
(" + getBkQuorumSize() + ")");
+                                             + ") is less than the write 
quorum size (" + getBkWriteQuorumSize() + ")");
+        }
+
+        if (getBkWriteQuorumSize() < getBkAckQuorumSize()) {
+            throw new ConfigurationException("BK write quorum size (" + 
getBkWriteQuorumSize()
+                                             + ") is less than the ack quorum 
size (" + getBkAckQuorumSize() + ")");
         }
 
         // add other checks here

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1416677&r1=1416676&r2=1416677&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
 Mon Dec  3 21:38:08 2012
@@ -1033,7 +1033,8 @@ public class BookkeeperPersistenceManage
                             final Version expectedVersionOfLedgersNode, final 
TopicInfo topicInfo,
                             final long startSeqId, final boolean changeLedger,
                             final Callback<Void> cb, final Object ctx) {
-        bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), 
DigestType.CRC32, passwd,
+        bk.asyncCreateLedger(cfg.getBkEnsembleSize(), 
cfg.getBkWriteQuorumSize(),
+                             cfg.getBkAckQuorumSize(), DigestType.CRC32, 
passwd,
         new SafeAsynBKCallback.CreateCallback() {
             AtomicBoolean processed = new AtomicBoolean(false);
 


Reply via email to