This is an automated email from the ASF dual-hosted git repository.
symat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 05cd214 ZOOKEEPER-3796: Skip Learner Request made to ObserverMaster
from going
05cd214 is described below
commit 05cd214a0cc9c870de373b54cfeb47a2a75efd28
Author: Mayank Tuteja <[email protected]>
AuthorDate: Tue May 5 19:20:35 2020 +0000
ZOOKEEPER-3796: Skip Learner Request made to ObserverMaster from going
... to next processor
Author: mayank99 <[email protected]>
Author: mayank99fb <[email protected]>
Reviewers: Andor Molnar <[email protected]>, Enrico Olivelli
<[email protected]>, Mate Szalay-Beko <[email protected]>
Closes #1322 from mayank99fb/ZOOKEEPER-3796
---
.../src/main/resources/markdown/zookeeperAdmin.md | 9 +
.../java/org/apache/zookeeper/server/Request.java | 5 +
.../org/apache/zookeeper/server/ServerMetrics.java | 6 +
.../server/quorum/FollowerRequestProcessor.java | 19 +-
.../server/FollowerRequestProcessorTest.java | 89 ++++++++++
.../apache/zookeeper/test/ObserverMasterTest.java | 196 +--------------------
.../zookeeper/test/ObserverMasterTestBase.java | 120 +++++++++++++
7 files changed, 249 insertions(+), 195 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 6c616d1..2a9d7d3 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1653,6 +1653,15 @@ New features that are currently considered experimental.
values and see changes from other clients. See
ZOOKEEPER-784 for more details.
+* *zookeeper.follower.skipLearnerRequestToNextProcessor* :
+ (Java system property:
**zookeeper.follower.skipLearnerRequestToNextProcessor**)
+ When our cluster has observers which are connected with ObserverMaster,
then turning on this flag might help
+ you reduce some memory pressure on the Observer Master. If your cluster
doesn't have any observers or
+ they are not connected with ObserverMaster or your Observer's don't make
much writes, then using this flag
+ won't help you.
+ Currently the change here is guarded behind the flag to help us get more
confidence around the memory gains.
+ In Long run, we might want to remove this flag and set its behavior as the
default codepath.
+
<a name="Unsafe+Options"></a>
#### Unsafe Options
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 43a68ac..4296471 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.metrics.Summary;
import org.apache.zookeeper.metrics.SummarySet;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.txn.TxnDigest;
@@ -490,4 +491,8 @@ public class Request {
public void setTxnDigest(TxnDigest txnDigest) {
this.txnDigest = txnDigest;
}
+
+ public boolean isFromLearner() {
+ return owner instanceof LearnerHandler;
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 7ea7010..36a65df 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -237,7 +237,10 @@ public final class ServerMetrics {
CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING =
metricsContext.getCounter("cnxn_closed_without_zk_server_running");
+ SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT =
metricsContext.getCounter("skip_learner_request_to_next_processor_count");
+
SOCKET_CLOSING_TIME = metricsContext.getSummary("socket_closing_time",
DetailLevel.BASIC);
+
}
/**
@@ -458,8 +461,11 @@ public final class ServerMetrics {
public final Counter CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING;
+ public final Counter SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT;
+
public final Summary SOCKET_CLOSING_TIME;
+
private final MetricsProvider metricsProvider;
public void resetAll() {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
index db51aee..90c4d49 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
@@ -24,6 +24,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.txn.ErrorTxn;
@@ -38,6 +39,10 @@ public class FollowerRequestProcessor extends
ZooKeeperCriticalThread implements
private static final Logger LOG =
LoggerFactory.getLogger(FollowerRequestProcessor.class);
+ public static final String SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR =
"zookeeper.follower.skipLearnerRequestToNextProcessor";
+
+ private final boolean skipLearnerRequestToNextProcessor;
+
FollowerZooKeeperServer zks;
RequestProcessor nextProcessor;
@@ -50,6 +55,9 @@ public class FollowerRequestProcessor extends
ZooKeeperCriticalThread implements
super("FollowerRequestProcessor:" + zks.getServerId(),
zks.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
+ this.skipLearnerRequestToNextProcessor =
Boolean.getBoolean(SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR);
+ LOG.info("Initialized FollowerRequestProcessor with {} as {}",
SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR,
+ skipLearnerRequestToNextProcessor);
}
@Override
@@ -72,7 +80,8 @@ public class FollowerRequestProcessor extends
ZooKeeperCriticalThread implements
// We want to queue the request to be processed before we
submit
// the request to the leader so that we are ready to receive
// the response
- nextProcessor.processRequest(request);
+ maybeSendRequestToNextProcessor(request);
+
if (request.isThrottled()) {
continue;
}
@@ -115,6 +124,14 @@ public class FollowerRequestProcessor extends
ZooKeeperCriticalThread implements
LOG.info("FollowerRequestProcessor exited loop!");
}
+ private void maybeSendRequestToNextProcessor(Request request) throws
RequestProcessorException {
+ if (skipLearnerRequestToNextProcessor && request.isFromLearner()) {
+
ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.add(1);
+ } else {
+ nextProcessor.processRequest(request);
+ }
+ }
+
public void processRequest(Request request) {
processRequest(request, true);
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FollowerRequestProcessorTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FollowerRequestProcessorTest.java
new file mode 100644
index 0000000..dd81c77
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FollowerRequestProcessorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.util.PortForwarder;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ObserverMasterTestBase;
+import org.junit.After;
+import org.junit.Test;
+
+public class FollowerRequestProcessorTest extends ObserverMasterTestBase {
+
+ private PortForwarder forwarder;
+
+ @Test
+ public void
testFollowerRequestProcessorSkipsLearnerRequestToNextProcessor() throws
Exception {
+ setupTestObserverServer("true");
+
+ zk.create("/testFollowerSkipNextAProcessor", "test".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ assertEquals("test", new
String(zk.getData("/testFollowerSkipNextAProcessor", null, null)));
+ assertEquals(1L,
ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.get());
+ }
+
+ @Test
+ public void
testFollowerRequestProcessorSendsLearnerRequestToNextProcessor() throws
Exception {
+ setupTestObserverServer("false");
+
+ zk.create("/testFollowerSkipNextAProcessor", "test".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ assertEquals("test", new
String(zk.getData("/testFollowerSkipNextAProcessor", null, null)));
+ assertEquals(0L,
ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.get());
+ }
+
+ private void setupTestObserverServer(String
skipLearnerRequestToNextProcessor) throws Exception {
+
System.setProperty(FollowerRequestProcessor.SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR,
skipLearnerRequestToNextProcessor);
+
+ // Setup Ensemble with observer master port so that observer connects
with Observer master and not the leader
+ final int OM_PROXY_PORT = PortAssignment.unique();
+ forwarder = setUp(OM_PROXY_PORT, true);
+
+ q3.start();
+ assertTrue(
+ "waiting for server 3 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
CONNECTION_TIMEOUT));
+
+ // Connect with observer zookeeper
+ zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS,
ClientBase.CONNECTION_TIMEOUT, this);
+ waitForOne(zk, States.CONNECTED);
+
+ // Clear all service metrics collected so far
+ ServerMetrics.getMetrics().resetAll();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+
System.setProperty(FollowerRequestProcessor.SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR,
"false");
+
+ shutdown();
+ if (forwarder != null) {
+ forwarder.shutdown();
+ }
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
index 620953a..cfac042 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
@@ -50,8 +50,6 @@ import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
@@ -60,10 +58,7 @@ import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.admin.Commands;
-import org.apache.zookeeper.server.quorum.DelayRequestProcessor;
-import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.server.util.PortForwarder;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -72,7 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
-public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher {
+public class ObserverMasterTest extends ObserverMasterTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(ObserverMasterTest.class);
@@ -87,184 +82,8 @@ public class ObserverMasterTest extends QuorumPeerTestBase
implements Watcher {
private Boolean testObserverMaster;
- private CountDownLatch latch;
- ZooKeeper zk;
- private WatchedEvent lastEvent = null;
-
- private int CLIENT_PORT_QP1;
- private int CLIENT_PORT_QP2;
- private int CLIENT_PORT_OBS;
- private int OM_PORT;
- private MainThread q1;
- private MainThread q2;
- private MainThread q3;
-
private PortForwarder setUp(final int omProxyPort) throws IOException {
- ClientBase.setupTestEnv();
-
- final int PORT_QP1 = PortAssignment.unique();
- final int PORT_QP2 = PortAssignment.unique();
- final int PORT_OBS = PortAssignment.unique();
- final int PORT_QP_LE1 = PortAssignment.unique();
- final int PORT_QP_LE2 = PortAssignment.unique();
- final int PORT_OBS_LE = PortAssignment.unique();
-
- CLIENT_PORT_QP1 = PortAssignment.unique();
- CLIENT_PORT_QP2 = PortAssignment.unique();
- CLIENT_PORT_OBS = PortAssignment.unique();
-
- OM_PORT = PortAssignment.unique();
-
- String quorumCfgSection = "server.1=127.0.0.1:" + (PORT_QP1) + ":" +
(PORT_QP_LE1) + ";" + CLIENT_PORT_QP1
- + "\nserver.2=127.0.0.1:" + (PORT_QP2) + ":"
+ (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2
- + "\nserver.3=127.0.0.1:" + (PORT_OBS) + ":"
+ (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS;
- String extraCfgs = testObserverMaster
- ? String.format("observerMasterPort=%d%n", OM_PORT)
- : "";
- String extraCfgsObs = testObserverMaster
- ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ?
OM_PORT : omProxyPort)
- : "";
-
- PortForwarder forwarder = null;
- if (testObserverMaster && omProxyPort >= 0) {
- forwarder = new PortForwarder(omProxyPort, OM_PORT);
- }
-
- q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
- q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
- q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection,
extraCfgsObs);
- q1.start();
- q2.start();
- assertTrue(
- "waiting for server 1 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
CONNECTION_TIMEOUT));
- assertTrue(
- "waiting for server 2 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
CONNECTION_TIMEOUT));
- return forwarder;
- }
-
- private void shutdown() throws InterruptedException {
- LOG.info("Shutting down all servers");
- zk.close();
-
- q1.shutdown();
- q2.shutdown();
- q3.shutdown();
-
- assertTrue(
- "Waiting for server 1 to shut down",
- ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1,
ClientBase.CONNECTION_TIMEOUT));
- assertTrue(
- "Waiting for server 2 to shut down",
- ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2,
ClientBase.CONNECTION_TIMEOUT));
- assertTrue(
- "Waiting for server 3 to shut down",
- ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS,
ClientBase.CONNECTION_TIMEOUT));
- }
-
- @Test
- public void testLaggingObserverMaster() throws Exception {
- final int OM_PROXY_PORT = PortAssignment.unique();
- PortForwarder forwarder = setUp(OM_PROXY_PORT);
-
- // find the leader and observer master
- int leaderPort;
- MainThread leader;
- MainThread follower;
- if (q1.getQuorumPeer().leader != null) {
- leaderPort = CLIENT_PORT_QP1;
- leader = q1;
- follower = q2;
- } else if (q2.getQuorumPeer().leader != null) {
- leaderPort = CLIENT_PORT_QP2;
- leader = q2;
- follower = q1;
- } else {
- throw new RuntimeException("No leader");
- }
-
- // ensure the observer master has commits in the queue before observer
sync
- zk = new ZooKeeper("127.0.0.1:" + leaderPort,
ClientBase.CONNECTION_TIMEOUT, this);
- for (int i = 0; i < 10; i++) {
- zk.create("/bulk" + i, ("initial data of some size").getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- zk.close();
-
- q3.start();
- assertTrue(
- "waiting for server 3 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
CONNECTION_TIMEOUT));
-
- latch = new CountDownLatch(1);
- zk = new ZooKeeper("127.0.0.1:" + leaderPort,
ClientBase.CONNECTION_TIMEOUT, this);
- latch.await();
- assertEquals(zk.getState(), States.CONNECTED);
-
- zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- final long lastLoggedZxid = leader.getQuorumPeer().getLastLoggedZxid();
-
- // wait for change to propagate
- waitFor("Timeout waiting for observer sync", new WaitForCondition() {
- public boolean evaluate() {
- return lastLoggedZxid ==
q3.getQuorumPeer().getLastLoggedZxid();
- }
- }, 30);
-
- // simulate network fault
- if (forwarder != null) {
- forwarder.shutdown();
- }
-
- for (int i = 0; i < 10; i++) {
- zk.create("/basic" + i, "second".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- }
-
- DelayRequestProcessor delayRequestProcessor = null;
- if (testObserverMaster) {
- FollowerZooKeeperServer followerZooKeeperServer =
(FollowerZooKeeperServer) follower.getQuorumPeer().getActiveServer();
- delayRequestProcessor =
DelayRequestProcessor.injectDelayRequestProcessor(followerZooKeeperServer);
- }
-
- zk.create("/target1", "third".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- zk.create("/target2", "third".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
-
- LOG.info(
- "observer zxid {}{} leader zxid {}",
- Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()),
- (testObserverMaster ? "" : " observer master zxid " +
Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())),
- Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid()));
-
- // restore network
- forwarder = testObserverMaster ? new PortForwarder(OM_PROXY_PORT,
OM_PORT) : null;
-
- assertTrue(
- "waiting for server 3 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
CONNECTION_TIMEOUT));
- assertNotNull("Leader switched", leader.getQuorumPeer().leader);
-
- if (delayRequestProcessor != null) {
- delayRequestProcessor.unblockQueue();
- }
-
- latch = new CountDownLatch(1);
- ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS,
ClientBase.CONNECTION_TIMEOUT, this);
- latch.await();
- zk.create("/finalop", "fourth".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
-
- assertEquals("first", new String(obsZk.getData("/init", null, null)));
- assertEquals("third", new String(obsZk.getData("/target1", null,
null)));
-
- obsZk.close();
- shutdown();
-
- try {
- if (forwarder != null) {
- forwarder.shutdown();
- }
- } catch (Exception e) {
- // ignore
- }
+ return setUp(omProxyPort, testObserverMaster);
}
/**
@@ -678,17 +497,6 @@ public class ObserverMasterTest extends QuorumPeerTestBase
implements Watcher {
s1.shutdown();
}
- /**
- * Implementation of watcher interface.
- */
- public void process(WatchedEvent event) {
- lastEvent = event;
- if (latch != null) {
- latch.countDown();
- }
- LOG.info("Latch got event :: {}", event);
- }
-
class AsyncWriter implements Runnable {
private final ZooKeeper client;
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTestBase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTestBase.java
new file mode 100644
index 0000000..505378a
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTestBase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.util.PortForwarder;
+
+public class ObserverMasterTestBase extends QuorumPeerTestBase implements
Watcher {
+
+ protected CountDownLatch latch;
+ protected ZooKeeper zk;
+ protected int CLIENT_PORT_QP1;
+ protected int CLIENT_PORT_QP2;
+ protected int CLIENT_PORT_OBS;
+ protected int OM_PORT;
+ protected MainThread q1;
+ protected MainThread q2;
+ protected MainThread q3;
+ protected WatchedEvent lastEvent = null;
+
+ protected PortForwarder setUp(final int omProxyPort, final Boolean
testObserverMaster) throws IOException {
+ ClientBase.setupTestEnv();
+ final int PORT_QP1 = PortAssignment.unique();
+ final int PORT_QP2 = PortAssignment.unique();
+ final int PORT_OBS = PortAssignment.unique();
+ final int PORT_QP_LE1 = PortAssignment.unique();
+ final int PORT_QP_LE2 = PortAssignment.unique();
+ final int PORT_OBS_LE = PortAssignment.unique();
+
+ CLIENT_PORT_QP1 = PortAssignment.unique();
+ CLIENT_PORT_QP2 = PortAssignment.unique();
+ CLIENT_PORT_OBS = PortAssignment.unique();
+
+ OM_PORT = PortAssignment.unique();
+
+ String quorumCfgSection =
+ "server.1=127.0.0.1:" + (PORT_QP1)
+ + ":" + (PORT_QP_LE1) + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=127.0.0.1:" + (PORT_QP2)
+ + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2
+ + "\nserver.3=127.0.0.1:" + (PORT_OBS)
+ + ":" + (PORT_OBS_LE) + ":observer" + ";" +
CLIENT_PORT_OBS;
+
+ String extraCfgs = testObserverMaster ?
String.format("observerMasterPort=%d%n", OM_PORT) : "";
+ String extraCfgsObs = testObserverMaster ?
String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT :
omProxyPort) : "";
+
+ PortForwarder forwarder = null;
+ if (testObserverMaster && omProxyPort >= 0) {
+ forwarder = new PortForwarder(omProxyPort, OM_PORT);
+ }
+
+ q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
+ q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
+ q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection,
extraCfgsObs);
+ q1.start();
+ q2.start();
+ assertTrue(
+ "waiting for server 1 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
CONNECTION_TIMEOUT));
+ assertTrue(
+ "waiting for server 2 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
CONNECTION_TIMEOUT));
+ return forwarder;
+ }
+
+ protected void shutdown() throws InterruptedException {
+ LOG.info("Shutting down all servers");
+
+ zk.close();
+
+ q1.shutdown();
+ q2.shutdown();
+ q3.shutdown();
+
+ assertTrue(
+ "Waiting for server 1 to shut down",
+ ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1,
ClientBase.CONNECTION_TIMEOUT));
+ assertTrue(
+ "Waiting for server 2 to shut down",
+ ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2,
ClientBase.CONNECTION_TIMEOUT));
+ assertTrue(
+ "Waiting for server 3 to shut down",
+ ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS,
ClientBase.CONNECTION_TIMEOUT));
+ }
+
+ /**
+ * Implementation of watcher interface.
+ */
+ public void process(WatchedEvent event) {
+ lastEvent = event;
+ if (latch != null) {
+ latch.countDown();
+ }
+ LOG.info("Latch got event :: {}", event);
+ }
+}