This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new bc9afbf8e ZOOKEEPER-4712: Fix partially shutdown of ZooKeeperServer
and its processors
bc9afbf8e is described below
commit bc9afbf8ef1bc6156643d3d05c87fcf8411e9d8f
Author: Jon Marius Venstad <[email protected]>
AuthorDate: Fri Sep 20 21:10:08 2024 +0200
ZOOKEEPER-4712: Fix partially shutdown of ZooKeeperServer and its processors
Reviewers: anmolnar, kezhuw, kezhuw, kezhuw
Author: jonmv
Closes #2154 from jonmv/jonmv/ZOOKEEPER-4541-take-2
---
.../org/apache/zookeeper/server/ZKDatabase.java | 2 +-
.../apache/zookeeper/server/ZooKeeperServer.java | 70 +++++++++---------
.../zookeeper/server/ZooKeeperServerMain.java | 4 +-
.../server/persistence/FileTxnSnapLog.java | 2 +-
.../server/quorum/LeaderZooKeeperServer.java | 4 +-
.../apache/zookeeper/server/quorum/Learner.java | 28 +++----
.../server/quorum/LearnerZooKeeperServer.java | 17 ++---
.../server/quorum/ObserverZooKeeperServer.java | 14 +---
.../server/quorum/ReadOnlyZooKeeperServer.java | 8 +-
.../server/quorum/SendAckRequestProcessor.java | 4 +-
.../server/ZooKeeperServerShutdownTest.java | 85 ++++++++++++++++++++++
.../zookeeper/server/quorum/WatchLeakTest.java | 2 +-
12 files changed, 152 insertions(+), 88 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index c1e0b4834..c27ac6aa8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -294,7 +294,7 @@ public class ZKDatabase {
}
/**
- * Fast forward the database adding transactions from the committed log
into memory.
+ * Fast-forward the database adding transactions from the committed log
into memory.
* @return the last valid zxid.
* @throws IOException
*/
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index af4147afd..6740f6d52 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -900,7 +900,7 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
* @return true if the server is running or server hits an error, false
* otherwise.
*/
- protected boolean canShutdown() {
+ private boolean canShutdown() {
return state == State.RUNNING || state == State.ERROR;
}
@@ -911,27 +911,49 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
return state == State.RUNNING;
}
- public void shutdown() {
+ public final void shutdown() {
shutdown(false);
}
/**
* Shut down the server instance
- * @param fullyShutDown true if another server using the same database
will not replace this one in the same process
+ * @param fullyShutDown true when no other server will use the same
database to replace this one
*/
- public synchronized void shutdown(boolean fullyShutDown) {
- if (!canShutdown()) {
- if (fullyShutDown && zkDb != null) {
- zkDb.clear();
+ public final synchronized void shutdown(boolean fullyShutDown) {
+ if (canShutdown()) {
+ LOG.info("Shutting down");
+
+ shutdownComponents();
+
+ if (zkDb != null && !fullyShutDown) {
+ // There is no need to clear the database if we are going to
reuse it:
+ // * When a new quorum is established we can still apply the
diff
+ // on top of the same zkDb data
+ // * If we fetch a new snapshot from leader, the zkDb will be
+ // cleared anyway before loading the snapshot
+ try {
+ // This will fast-forward the database to the last
recorded transaction
+ zkDb.fastForwardDataBase();
+ } catch (IOException e) {
+ LOG.error("Error updating DB", e);
+ fullyShutDown = true;
+ }
}
+ setState(State.SHUTDOWN);
+ } else {
LOG.debug("ZooKeeper server is not running, so not proceeding to
shutdown!");
- return;
}
- LOG.info("shutting down");
-
- // new RuntimeException("Calling shutdown").printStackTrace();
- setState(State.SHUTDOWN);
+ if (zkDb != null && fullyShutDown) {
+ zkDb.clear();
+ }
+ }
+ /**
+ * @implNote
+ * Shuts down components owned by this class;
+ * remember to call super.shutdownComponents() when overriding!
+ */
+ protected void shutdownComponents() {
// unregister all metrics that are keeping a strong reference to this
object
// subclasses will do their specific clean up
unregisterMetrics();
@@ -940,9 +962,8 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
requestThrottler.shutdown();
}
- // Since sessionTracker and syncThreads poll we just have to
- // set running to false and they will detect it during the poll
- // interval.
+ // Since sessionTracker and syncThreads poll we just have to set
running to false,
+ // and they will detect it during the poll interval.
if (sessionTracker != null) {
sessionTracker.shutdown();
}
@@ -953,25 +974,6 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
jvmPauseMonitor.serviceStop();
}
- if (zkDb != null) {
- if (fullyShutDown) {
- zkDb.clear();
- } else {
- // else there is no need to clear the database
- // * When a new quorum is established we can still apply the
diff
- // on top of the same zkDb data
- // * If we fetch a new snapshot from leader, the zkDb will be
- // cleared anyway before loading the snapshot
- try {
- //This will fast forward the database to the latest
recorded transactions
- zkDb.fastForwardDataBase();
- } catch (IOException e) {
- LOG.error("Error updating DB", e);
- zkDb.clear();
- }
- }
- }
-
requestPathMetricsCollector.shutdown();
unregisterJMX();
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
index d6cbbe4a1..24e7f5b18 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
@@ -192,9 +192,7 @@ public class ZooKeeperServerMain {
if (secureCnxnFactory != null) {
secureCnxnFactory.join();
}
- if (zkServer.canShutdown()) {
- zkServer.shutdown(true);
- }
+ zkServer.shutdown(true);
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 29b4c0a61..281682604 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -313,7 +313,7 @@ public class FileTxnSnapLog {
}
/**
- * This function will fast forward the server database to have the latest
+ * This function will fast-forward the server database to have the latest
* transactions in it. This is the same as restore, but only reads from
* the transaction logs and not restores from a snapshot.
* @param dt the datatree to write transactions to.
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index 2de080bd3..799b8f961 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -155,11 +155,11 @@ public class LeaderZooKeeperServer extends
QuorumZooKeeperServer {
}
@Override
- public synchronized void shutdown() {
+ protected synchronized void shutdownComponents() {
if (containerManager != null) {
containerManager.stop();
}
- super.shutdown();
+ super.shutdownComponents();
}
@Override
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index bb7be70dd..1ef99e50a 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -916,26 +916,26 @@ public class Learner {
}
void closeSocket() {
- if (sock != null) {
- if (sockBeingClosed.compareAndSet(false, true)) {
- if (closeSocketAsync) {
- final Thread closingThread = new Thread(() ->
closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
- closingThread.setDaemon(true);
- closingThread.start();
- } else {
- closeSockSync();
- }
+ if (sockBeingClosed.compareAndSet(false, true)) {
+ if (sock == null) { // Closing before establishing the connection
is a noop
+ return;
+ }
+ Socket socket = sock;
+ sock = null;
+ if (closeSocketAsync) {
+ final Thread closingThread = new Thread(() ->
closeSockSync(socket), "CloseSocketThread(sid:" + zk.getServerId());
+ closingThread.setDaemon(true);
+ closingThread.start();
+ } else {
+ closeSockSync(socket);
}
}
}
- void closeSockSync() {
+ private static void closeSockSync(Socket socket) {
try {
long startTime = Time.currentElapsedTime();
- if (sock != null) {
- sock.close();
- sock = null;
- }
+ socket.close();
ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() -
startTime);
} catch (IOException e) {
LOG.warn("Ignoring error closing connection to leader", e);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
index efd2e3762..a7c345d07 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
@@ -152,17 +152,7 @@ public abstract class LearnerZooKeeperServer extends
QuorumZooKeeperServer {
}
@Override
- public synchronized void shutdown() {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to
shutdown!");
- return;
- }
- LOG.info("Shutting down");
- try {
- super.shutdown();
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception during shutdown", e);
- }
+ protected void shutdownComponents() {
try {
if (syncProcessor != null) {
syncProcessor.shutdown();
@@ -170,6 +160,11 @@ public abstract class LearnerZooKeeperServer extends
QuorumZooKeeperServer {
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception in syncprocessor
shutdown", e);
}
+ try {
+ super.shutdownComponents();
+ } catch (Exception e) {
+ LOG.warn("Ignoring unexpected exception during shutdown", e);
+ }
}
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
index 79aa10ca9..74663a9c1 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
@@ -44,7 +44,7 @@ public class ObserverZooKeeperServer extends
LearnerZooKeeperServer {
* take periodic snapshot. Default is ON.
*/
- private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
+ private final boolean syncRequestProcessorEnabled =
this.self.getSyncEnabled();
/*
* Pending sync requests
@@ -127,18 +127,6 @@ public class ObserverZooKeeperServer extends
LearnerZooKeeperServer {
return "observer";
}
- @Override
- public synchronized void shutdown() {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to
shutdown!");
- return;
- }
- super.shutdown();
- if (syncRequestProcessorEnabled && syncProcessor != null) {
- syncProcessor.shutdown();
- }
- }
-
@Override
public void dumpMonitorValues(BiConsumer<String, Object> response) {
super.dumpMonitorValues(response);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
index 0efa46770..7e9ff9252 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
@@ -190,11 +190,7 @@ public class ReadOnlyZooKeeperServer extends
ZooKeeperServer {
}
@Override
- public synchronized void shutdown() {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to
shutdown!");
- return;
- }
+ protected void shutdownComponents() {
shutdown = true;
unregisterJMX(this);
@@ -206,7 +202,7 @@ public class ReadOnlyZooKeeperServer extends
ZooKeeperServer {
self.adminServer.setZooKeeperServer(null);
// shutdown the server itself
- super.shutdown();
+ super.shutdownComponents();
}
@Override
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
index 78c9dbd39..3f42c2bd3 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -46,7 +46,7 @@ public class SendAckRequestProcessor implements
RequestProcessor, Flushable {
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during
packet send", e);
- learner.closeSockSync();
+ learner.closeSocket();
}
}
}
@@ -56,7 +56,7 @@ public class SendAckRequestProcessor implements
RequestProcessor, Flushable {
learner.writePacket(null, true);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet
send", e);
- learner.closeSockSync();
+ learner.closeSocket();
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerShutdownTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerShutdownTest.java
new file mode 100644
index 000000000..b0dd19cb7
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerShutdownTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Learner;
+import org.apache.zookeeper.server.quorum.LearnerZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class ZooKeeperServerShutdownTest extends ZKTestCase {
+
+ static class ShutdownTrackRequestProcessor implements RequestProcessor {
+ boolean shutdown = false;
+
+ @Override
+ public void processRequest(Request request) throws
RequestProcessorException {
+ }
+
+ @Override
+ public void shutdown() {
+ shutdown = true;
+ }
+ }
+
+ public static class ShutdownTrackLearnerZooKeeperServer extends
LearnerZooKeeperServer {
+ public ShutdownTrackLearnerZooKeeperServer(FileTxnSnapLog logFactory,
QuorumPeer self) throws IOException {
+ super(logFactory, 2000, 2000, 2000, -1, new
ZKDatabase(logFactory), self);
+ }
+
+ @Override
+ protected void setupRequestProcessors() {
+ firstProcessor = new ShutdownTrackRequestProcessor();
+ syncProcessor = new SyncRequestProcessor(this, null);
+ syncProcessor.start();
+ }
+
+ ShutdownTrackRequestProcessor getFirstProcessor() {
+ return (ShutdownTrackRequestProcessor) firstProcessor;
+ }
+
+ SyncRequestProcessor getSyncRequestProcessor() {
+ return syncProcessor;
+ }
+
+ @Override
+ public Learner getLearner() {
+ return null;
+ }
+ }
+
+ @Test
+ void testLearnerZooKeeperServerShutdown(@TempDir File tmpDir) throws
Exception {
+ File tmpFile = File.createTempFile("test", ".dir", tmpDir);
+ tmpFile.delete();
+ FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpFile, tmpFile);
+ ShutdownTrackLearnerZooKeeperServer zooKeeperServer = new
ShutdownTrackLearnerZooKeeperServer(logFactory, new QuorumPeer());
+ zooKeeperServer.startup();
+ zooKeeperServer.shutdown(false);
+ assertTrue(zooKeeperServer.getFirstProcessor().shutdown);
+ assertFalse(zooKeeperServer.getSyncRequestProcessor().isAlive());
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java
index b51cfbe3b..8db90a248 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java
@@ -97,7 +97,7 @@ public class WatchLeakTest {
}
});
- ZKDatabase database = new ZKDatabase(null);
+ ZKDatabase database = new ZKDatabase(mock(FileTxnSnapLog.class));
database.setlastProcessedZxid(2L);
QuorumPeer quorumPeer = mock(QuorumPeer.class);
FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class);