This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 0d867ebd4a3 SOLR-17702: Move ZK ConnectionListener logic out of
SolrZkClient (#3266)
0d867ebd4a3 is described below
commit 0d867ebd4a37f160b35da57f06ccac6edc390ba5
Author: Houston Putman <[email protected]>
AuthorDate: Tue Mar 18 14:02:43 2025 -0500
SOLR-17702: Move ZK ConnectionListener logic out of SolrZkClient (#3266)
---
.../java/org/apache/solr/cloud/ZkController.java | 66 ++++++++++++++--------
.../src/java/org/apache/solr/core/ZkContainer.java | 10 ++--
.../solr/handler/admin/ZookeeperInfoHandler.java | 2 +-
.../apache/solr/schema/ZkIndexSchemaReader.java | 2 +-
.../org/apache/solr/cloud/LeaderElectionTest.java | 4 +-
.../org/apache/solr/common/cloud/OnDisconnect.java | 4 +-
.../org/apache/solr/common/cloud/OnReconnect.java | 4 +-
.../org/apache/solr/common/cloud/SolrZkClient.java | 35 ------------
.../apache/solr/common/cloud/ZkStateReader.java | 19 ++++---
9 files changed, 68 insertions(+), 78 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index ed452830a3b..ff6540d0153 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -77,6 +77,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.NodesSysPropsCacher;
+import org.apache.solr.common.cloud.OnDisconnect;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
@@ -201,6 +202,12 @@ public class ZkController implements Closeable {
private CloudHttp2SolrClient cloudSolrClient;
+ private final ExecutorService zkConnectionListenerCallbackExecutor =
+ ExecutorUtil.newMDCAwareSingleThreadExecutor(
+ new SolrNamedThreadFactory("zkConnectionListenerCallback"));
+ private final OnReconnect onReconnect = this::onReconnect;
+ private final OnDisconnect onDisconnect = this::onDisconnect;
+
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
private final int localHostPort; // example: 54065
@@ -249,7 +256,7 @@ public class ZkController implements Closeable {
// keeps track of a list of objects that need to know a new ZooKeeper
session was created after
// expiration occurred ref is held as a HashSet since we clone the set
before notifying to avoid
// synchronizing too long
- private HashSet<OnReconnect> reconnectListeners = new HashSet<>();
+ private final HashSet<OnReconnect> reconnectListeners = new HashSet<>();
private class RegisterCoreAsync implements Callable<Object> {
@@ -274,22 +281,6 @@ public class ZkController implements Closeable {
}
}
- // notifies registered listeners after the ZK reconnect in the background
- private static class OnReconnectNotifyAsync implements Callable<Object> {
-
- private final OnReconnect listener;
-
- OnReconnectNotifyAsync(OnReconnect listener) {
- this.listener = listener;
- }
-
- @Override
- public Object call() throws Exception {
- listener.command();
- return null;
- }
- }
-
/**
* @param cc Core container associated with this controller. cannot be null.
* @param zkServerAddress where to connect to the zk server
@@ -360,12 +351,19 @@ public class ZkController implements Closeable {
.withUrl(zkServerAddress)
.withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
- .withReconnectListener(this::onReconnect)
- .withDisconnectListener((sessionExpired) ->
onDisconnect(sessionExpired))
.withAclProvider(zkACLProvider)
.withClosedCheck(cc::isShutDown)
.withCompressor(compressor)
.build();
+
+ zkClient
+ .getCuratorFramework()
+ .getConnectionStateListenable()
+ .addListener(onReconnect, zkConnectionListenerCallbackExecutor);
+ zkClient
+ .getCuratorFramework()
+ .getConnectionStateListenable()
+ .addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
// Refuse to start if ZK has a non empty /clusterstate.json or a /solr.xml
file
checkNoOldClusterstate(zkClient);
@@ -487,6 +485,9 @@ public class ZkController implements Closeable {
} else {
register(descriptor.getName(), descriptor, true, true, false);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
} catch (Exception e) {
log.error("Error registering SolrCore", e);
}
@@ -502,11 +503,22 @@ public class ZkController implements Closeable {
for (OnReconnect listener : clonedListeners) {
try {
if (executorService != null) {
- executorService.submit(new OnReconnectNotifyAsync(listener));
+ executorService.execute(
+ () -> {
+ try {
+ listener.onReconnect();
+ } catch (Throwable exc) {
+ // not much we can do here other than warn in the log
+ log.warn(
+ "Error when notifying OnReconnect listener {} after
session re-connected.",
+ listener,
+ exc);
+ }
+ });
} else {
- listener.command();
+ listener.onReconnect();
}
- } catch (Exception exc) {
+ } catch (Throwable exc) {
// not much we can do here other than warn in the log
log.warn(
"Error when notifying OnReconnect listener {} after session
re-connected.",
@@ -689,6 +701,16 @@ public class ZkController implements Closeable {
public void preClose() {
this.isClosed = true;
+ try {
+ // We do not want to react to connection state changes after we have
started to close
+
zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onReconnect);
+
zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onDisconnect);
+
ExecutorUtil.shutdownNowAndAwaitTermination(zkConnectionListenerCallbackExecutor);
+ } catch (Exception e) {
+ log.warn(
+ "Error stopping and shutting down
zkConnectionListenerCallbackExecutor. Continue closing the ZkController",
+ e);
+ }
try {
if (getZkClient().isConnected()) {
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 89fc8dbf4a9..f6e27ffdaf5 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -237,16 +237,16 @@ public class ZkContainer {
public void close() {
try {
- if (zkController != null) {
- zkController.close();
- }
+ ExecutorUtil.shutdownAndAwaitTermination(coreZkRegister);
} finally {
try {
+ if (zkController != null) {
+ zkController.close();
+ }
+ } finally {
if (zkServer != null) {
zkServer.stop();
}
- } finally {
- ExecutorUtil.shutdownAndAwaitTermination(coreZkRegister);
}
}
}
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
index b5a8234a321..5bb3cc95245 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
@@ -344,7 +344,7 @@ public final class ZookeeperInfoHandler extends
RequestHandlerBase {
/** Called after a ZooKeeper session expiration occurs */
@Override
- public void command() {
+ public void onReconnect() {
// we need to re-establish the watcher on the collections list after
session expires
synchronized (this) {
cachedCollections = null;
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index 3112bdced58..15d479608f4 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -225,7 +225,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
* the current schema from ZooKeeper.
*/
@Override
- public void command() {
+ public void onReconnect() {
try {
// setup a new watcher to get notified when the managed schema changes
schemaWatcher = createSchemaWatcher();
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
index d4a61abcc60..6c430f6e64b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
@@ -114,8 +114,10 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
.withUrl(server.getZkAddress())
.withTimeout(TIMEOUT, TimeUnit.MILLISECONDS)
.withConnTimeOut(TIMEOUT, TimeUnit.MILLISECONDS)
- .withReconnectListener(onReconnect)
.build();
+ if (onReconnect != null) {
+
zkClient.getCuratorFramework().getConnectionStateListenable().addListener(onReconnect);
+ }
zkStateReader = new ZkStateReader(zkClient);
elector = new LeaderElector(zkClient);
zkController = MockSolrSource.makeSimpleMock(null, zkStateReader,
zkClient);
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
index cd24a2ec4d1..9535a59cef5 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
@@ -21,12 +21,12 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
public interface OnDisconnect extends ConnectionStateListener {
- public void command(boolean sessionExpired);
+ void onDisconnect(boolean sessionExpired);
@Override
default void stateChanged(CuratorFramework client, ConnectionState newState)
{
if (newState == ConnectionState.LOST || newState ==
ConnectionState.SUSPENDED) {
- command(newState == ConnectionState.LOST);
+ onDisconnect(newState == ConnectionState.LOST);
}
}
}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 906a8368c35..8d54312d3e0 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -28,12 +28,12 @@ import
org.apache.curator.framework.state.ConnectionStateListener;
* to be notified of ZK reconnection events.
*/
public interface OnReconnect extends ConnectionStateListener {
- void command();
+ void onReconnect();
@Override
default void stateChanged(CuratorFramework client, ConnectionState newState)
{
if (ConnectionState.RECONNECTED.equals(newState)) {
- command();
+ onReconnect();
}
}
}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 3583f94592c..72cecda6567 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -97,9 +97,6 @@ public class SolrZkClient implements Closeable {
private final ExecutorService zkCallbackExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(new
SolrNamedThreadFactory("zkCallback"));
- private final ExecutorService zkConnectionListenerCallbackExecutor =
- ExecutorUtil.newMDCAwareSingleThreadExecutor(
- new SolrNamedThreadFactory("zkConnectionListenerCallback"));
private volatile boolean isClosed = false;
private int zkClientTimeout;
@@ -121,8 +118,6 @@ public class SolrZkClient implements Closeable {
builder.zkClientConnectTimeout,
builder.zkCredentialsProvider,
builder.aclProvider,
- builder.onReconnect,
- builder.onDisconnect,
builder.higherLevelIsClosed,
builder.minStateByteLenForCompression,
builder.compressor,
@@ -136,8 +131,6 @@ public class SolrZkClient implements Closeable {
int clientConnectTimeout,
ZkCredentialsProvider zkCredentialsProvider,
ACLProvider aclProvider,
- final OnReconnect onReconnect,
- OnDisconnect onDisconnect,
IsClosed higherLevelIsClosed,
int minStateByteLenForCompression,
Compressor compressor,
@@ -205,16 +198,6 @@ public class SolrZkClient implements Closeable {
new SolrZkCompressionProvider(compressor,
minStateByteLenForCompression))
.enableCompression()
.build();
- if (onReconnect != null) {
- client
- .getConnectionStateListenable()
- .addListener(onReconnect, zkConnectionListenerCallbackExecutor);
- }
- if (onDisconnect != null) {
- client
- .getConnectionStateListenable()
- .addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
- }
client.start();
try {
if (!client.blockUntilConnected(clientConnectTimeout,
TimeUnit.MILLISECONDS)) {
@@ -935,12 +918,6 @@ public class SolrZkClient implements Closeable {
} catch (Exception e) {
log.error("Error shutting down zkCallbackExecutor", e);
}
-
- try {
-
ExecutorUtil.shutdownAndAwaitTermination(zkConnectionListenerCallbackExecutor);
- } catch (Exception e) {
- log.error("Error shutting down zkConnManagerCallbackExecutor", e);
- }
}
/**
@@ -1228,8 +1205,6 @@ public class SolrZkClient implements Closeable {
public String zkServerAddress;
public int zkClientTimeout = SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
public int zkClientConnectTimeout =
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
- public OnReconnect onReconnect;
- public OnDisconnect onDisconnect;
public ZkCredentialsProvider zkCredentialsProvider;
public ACLProvider aclProvider;
public IsClosed higherLevelIsClosed;
@@ -1280,16 +1255,6 @@ public class SolrZkClient implements Closeable {
return this;
}
- public Builder withReconnectListener(OnReconnect onReconnect) {
- this.onReconnect = onReconnect;
- return this;
- }
-
- public Builder withDisconnectListener(OnDisconnect onDisconnect) {
- this.onDisconnect = onDisconnect;
- return this;
- }
-
public Builder withZkCredentialsProvider(ZkCredentialsProvider
zkCredentialsProvider) {
this.zkCredentialsProvider = zkCredentialsProvider;
return this;
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 948050acf9c..aa8b4edc799 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -407,29 +407,30 @@ public class ZkStateReader implements SolrCloseable {
int zkClientTimeout,
int zkClientConnectTimeout,
boolean canUseZkACLs) {
- SolrZkClient.Builder builder =
+ this.zkClient =
new SolrZkClient.Builder()
.withUrl(zkServerAddress)
.withTimeout(zkClientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
.withUseDefaultCredsAndACLs(canUseZkACLs)
- .withReconnectListener(
+ .build();
+ this.zkClient
+ .getCuratorFramework()
+ .getConnectionStateListenable()
+ .addListener(
+ (OnReconnect)
() -> {
// on reconnect, reload cloud info
try {
this.createClusterStateWatchersAndUpdate();
- } catch (KeeperException e) {
- log.error("A ZK error has occurred", e);
- throw new ZooKeeperException(
- ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
- log.error("Interrupted", e);
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
"Interrupted", e);
+ log.warn("Interrupted", e);
+ } catch (Throwable e) {
+ log.error("An error has occurred while updating the
cluster state", e);
}
});
- this.zkClient = builder.build();
this.closeClient = true;
this.securityNodeWatcher = null;
collectionPropertiesZkStateReader = new
CollectionPropertiesZkStateReader(this);