This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d867ebd4a3 SOLR-17702: Move ZK ConnectionListener logic out of 
SolrZkClient (#3266)
0d867ebd4a3 is described below

commit 0d867ebd4a37f160b35da57f06ccac6edc390ba5
Author: Houston Putman <[email protected]>
AuthorDate: Tue Mar 18 14:02:43 2025 -0500

    SOLR-17702: Move ZK ConnectionListener logic out of SolrZkClient (#3266)
---
 .../java/org/apache/solr/cloud/ZkController.java   | 66 ++++++++++++++--------
 .../src/java/org/apache/solr/core/ZkContainer.java | 10 ++--
 .../solr/handler/admin/ZookeeperInfoHandler.java   |  2 +-
 .../apache/solr/schema/ZkIndexSchemaReader.java    |  2 +-
 .../org/apache/solr/cloud/LeaderElectionTest.java  |  4 +-
 .../org/apache/solr/common/cloud/OnDisconnect.java |  4 +-
 .../org/apache/solr/common/cloud/OnReconnect.java  |  4 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java | 35 ------------
 .../apache/solr/common/cloud/ZkStateReader.java    | 19 ++++---
 9 files changed, 68 insertions(+), 78 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java 
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index ed452830a3b..ff6540d0153 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -77,6 +77,7 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocCollectionWatcher;
 import org.apache.solr.common.cloud.LiveNodesListener;
 import org.apache.solr.common.cloud.NodesSysPropsCacher;
+import org.apache.solr.common.cloud.OnDisconnect;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.PerReplicaStatesOps;
@@ -201,6 +202,12 @@ public class ZkController implements Closeable {
 
   private CloudHttp2SolrClient cloudSolrClient;
 
+  private final ExecutorService zkConnectionListenerCallbackExecutor =
+      ExecutorUtil.newMDCAwareSingleThreadExecutor(
+          new SolrNamedThreadFactory("zkConnectionListenerCallback"));
+  private final OnReconnect onReconnect = this::onReconnect;
+  private final OnDisconnect onDisconnect = this::onDisconnect;
+
   private final String zkServerAddress; // example: 127.0.0.1:54062/solr
 
   private final int localHostPort; // example: 54065
@@ -249,7 +256,7 @@ public class ZkController implements Closeable {
   // keeps track of a list of objects that need to know a new ZooKeeper 
session was created after
   // expiration occurred ref is held as a HashSet since we clone the set 
before notifying to avoid
   // synchronizing too long
-  private HashSet<OnReconnect> reconnectListeners = new HashSet<>();
+  private final HashSet<OnReconnect> reconnectListeners = new HashSet<>();
 
   private class RegisterCoreAsync implements Callable<Object> {
 
@@ -274,22 +281,6 @@ public class ZkController implements Closeable {
     }
   }
 
-  // notifies registered listeners after the ZK reconnect in the background
-  private static class OnReconnectNotifyAsync implements Callable<Object> {
-
-    private final OnReconnect listener;
-
-    OnReconnectNotifyAsync(OnReconnect listener) {
-      this.listener = listener;
-    }
-
-    @Override
-    public Object call() throws Exception {
-      listener.command();
-      return null;
-    }
-  }
-
   /**
    * @param cc Core container associated with this controller. cannot be null.
    * @param zkServerAddress where to connect to the zk server
@@ -360,12 +351,19 @@ public class ZkController implements Closeable {
             .withUrl(zkServerAddress)
             .withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
             .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
-            .withReconnectListener(this::onReconnect)
-            .withDisconnectListener((sessionExpired) -> 
onDisconnect(sessionExpired))
             .withAclProvider(zkACLProvider)
             .withClosedCheck(cc::isShutDown)
             .withCompressor(compressor)
             .build();
+
+    zkClient
+        .getCuratorFramework()
+        .getConnectionStateListenable()
+        .addListener(onReconnect, zkConnectionListenerCallbackExecutor);
+    zkClient
+        .getCuratorFramework()
+        .getConnectionStateListenable()
+        .addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
     // Refuse to start if ZK has a non empty /clusterstate.json or a /solr.xml 
file
     checkNoOldClusterstate(zkClient);
 
@@ -487,6 +485,9 @@ public class ZkController implements Closeable {
           } else {
             register(descriptor.getName(), descriptor, true, true, false);
           }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw e;
         } catch (Exception e) {
           log.error("Error registering SolrCore", e);
         }
@@ -502,11 +503,22 @@ public class ZkController implements Closeable {
       for (OnReconnect listener : clonedListeners) {
         try {
           if (executorService != null) {
-            executorService.submit(new OnReconnectNotifyAsync(listener));
+            executorService.execute(
+                () -> {
+                  try {
+                    listener.onReconnect();
+                  } catch (Throwable exc) {
+                    // not much we can do here other than warn in the log
+                    log.warn(
+                        "Error when notifying OnReconnect listener {} after 
session re-connected.",
+                        listener,
+                        exc);
+                  }
+                });
           } else {
-            listener.command();
+            listener.onReconnect();
           }
-        } catch (Exception exc) {
+        } catch (Throwable exc) {
           // not much we can do here other than warn in the log
           log.warn(
               "Error when notifying OnReconnect listener {} after session 
re-connected.",
@@ -689,6 +701,16 @@ public class ZkController implements Closeable {
 
   public void preClose() {
     this.isClosed = true;
+    try {
+      // We do not want to react to connection state changes after we have 
started to close
+      
zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onReconnect);
+      
zkClient.getCuratorFramework().getConnectionStateListenable().removeListener(onDisconnect);
+      
ExecutorUtil.shutdownNowAndAwaitTermination(zkConnectionListenerCallbackExecutor);
+    } catch (Exception e) {
+      log.warn(
+          "Error stopping and shutting down 
zkConnectionListenerCallbackExecutor. Continue closing the ZkController",
+          e);
+    }
 
     try {
       if (getZkClient().isConnected()) {
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java 
b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 89fc8dbf4a9..f6e27ffdaf5 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -237,16 +237,16 @@ public class ZkContainer {
   public void close() {
 
     try {
-      if (zkController != null) {
-        zkController.close();
-      }
+      ExecutorUtil.shutdownAndAwaitTermination(coreZkRegister);
     } finally {
       try {
+        if (zkController != null) {
+          zkController.close();
+        }
+      } finally {
         if (zkServer != null) {
           zkServer.stop();
         }
-      } finally {
-        ExecutorUtil.shutdownAndAwaitTermination(coreZkRegister);
       }
     }
   }
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java 
b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
index b5a8234a321..5bb3cc95245 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
@@ -344,7 +344,7 @@ public final class ZookeeperInfoHandler extends 
RequestHandlerBase {
 
     /** Called after a ZooKeeper session expiration occurs */
     @Override
-    public void command() {
+    public void onReconnect() {
       // we need to re-establish the watcher on the collections list after 
session expires
       synchronized (this) {
         cachedCollections = null;
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java 
b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index 3112bdced58..15d479608f4 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -225,7 +225,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
    * the current schema from ZooKeeper.
    */
   @Override
-  public void command() {
+  public void onReconnect() {
     try {
       // setup a new watcher to get notified when the managed schema changes
       schemaWatcher = createSchemaWatcher();
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java 
b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
index d4a61abcc60..6c430f6e64b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
@@ -114,8 +114,10 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
               .withUrl(server.getZkAddress())
               .withTimeout(TIMEOUT, TimeUnit.MILLISECONDS)
               .withConnTimeOut(TIMEOUT, TimeUnit.MILLISECONDS)
-              .withReconnectListener(onReconnect)
               .build();
+      if (onReconnect != null) {
+        
zkClient.getCuratorFramework().getConnectionStateListenable().addListener(onReconnect);
+      }
       zkStateReader = new ZkStateReader(zkClient);
       elector = new LeaderElector(zkClient);
       zkController = MockSolrSource.makeSimpleMock(null, zkStateReader, 
zkClient);
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
index cd24a2ec4d1..9535a59cef5 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnDisconnect.java
@@ -21,12 +21,12 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 
 public interface OnDisconnect extends ConnectionStateListener {
-  public void command(boolean sessionExpired);
+  void onDisconnect(boolean sessionExpired);
 
   @Override
   default void stateChanged(CuratorFramework client, ConnectionState newState) 
{
     if (newState == ConnectionState.LOST || newState == 
ConnectionState.SUSPENDED) {
-      command(newState == ConnectionState.LOST);
+      onDisconnect(newState == ConnectionState.LOST);
     }
   }
 }
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 906a8368c35..8d54312d3e0 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -28,12 +28,12 @@ import 
org.apache.curator.framework.state.ConnectionStateListener;
  * to be notified of ZK reconnection events.
  */
 public interface OnReconnect extends ConnectionStateListener {
-  void command();
+  void onReconnect();
 
   @Override
   default void stateChanged(CuratorFramework client, ConnectionState newState) 
{
     if (ConnectionState.RECONNECTED.equals(newState)) {
-      command();
+      onReconnect();
     }
   }
 }
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 3583f94592c..72cecda6567 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -97,9 +97,6 @@ public class SolrZkClient implements Closeable {
 
   private final ExecutorService zkCallbackExecutor =
       ExecutorUtil.newMDCAwareCachedThreadPool(new 
SolrNamedThreadFactory("zkCallback"));
-  private final ExecutorService zkConnectionListenerCallbackExecutor =
-      ExecutorUtil.newMDCAwareSingleThreadExecutor(
-          new SolrNamedThreadFactory("zkConnectionListenerCallback"));
 
   private volatile boolean isClosed = false;
   private int zkClientTimeout;
@@ -121,8 +118,6 @@ public class SolrZkClient implements Closeable {
         builder.zkClientConnectTimeout,
         builder.zkCredentialsProvider,
         builder.aclProvider,
-        builder.onReconnect,
-        builder.onDisconnect,
         builder.higherLevelIsClosed,
         builder.minStateByteLenForCompression,
         builder.compressor,
@@ -136,8 +131,6 @@ public class SolrZkClient implements Closeable {
       int clientConnectTimeout,
       ZkCredentialsProvider zkCredentialsProvider,
       ACLProvider aclProvider,
-      final OnReconnect onReconnect,
-      OnDisconnect onDisconnect,
       IsClosed higherLevelIsClosed,
       int minStateByteLenForCompression,
       Compressor compressor,
@@ -205,16 +198,6 @@ public class SolrZkClient implements Closeable {
                 new SolrZkCompressionProvider(compressor, 
minStateByteLenForCompression))
             .enableCompression()
             .build();
-    if (onReconnect != null) {
-      client
-          .getConnectionStateListenable()
-          .addListener(onReconnect, zkConnectionListenerCallbackExecutor);
-    }
-    if (onDisconnect != null) {
-      client
-          .getConnectionStateListenable()
-          .addListener(onDisconnect, zkConnectionListenerCallbackExecutor);
-    }
     client.start();
     try {
       if (!client.blockUntilConnected(clientConnectTimeout, 
TimeUnit.MILLISECONDS)) {
@@ -935,12 +918,6 @@ public class SolrZkClient implements Closeable {
     } catch (Exception e) {
       log.error("Error shutting down zkCallbackExecutor", e);
     }
-
-    try {
-      
ExecutorUtil.shutdownAndAwaitTermination(zkConnectionListenerCallbackExecutor);
-    } catch (Exception e) {
-      log.error("Error shutting down zkConnManagerCallbackExecutor", e);
-    }
   }
 
   /**
@@ -1228,8 +1205,6 @@ public class SolrZkClient implements Closeable {
     public String zkServerAddress;
     public int zkClientTimeout = SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
     public int zkClientConnectTimeout = 
SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
-    public OnReconnect onReconnect;
-    public OnDisconnect onDisconnect;
     public ZkCredentialsProvider zkCredentialsProvider;
     public ACLProvider aclProvider;
     public IsClosed higherLevelIsClosed;
@@ -1280,16 +1255,6 @@ public class SolrZkClient implements Closeable {
       return this;
     }
 
-    public Builder withReconnectListener(OnReconnect onReconnect) {
-      this.onReconnect = onReconnect;
-      return this;
-    }
-
-    public Builder withDisconnectListener(OnDisconnect onDisconnect) {
-      this.onDisconnect = onDisconnect;
-      return this;
-    }
-
     public Builder withZkCredentialsProvider(ZkCredentialsProvider 
zkCredentialsProvider) {
       this.zkCredentialsProvider = zkCredentialsProvider;
       return this;
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 948050acf9c..aa8b4edc799 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -407,29 +407,30 @@ public class ZkStateReader implements SolrCloseable {
       int zkClientTimeout,
       int zkClientConnectTimeout,
       boolean canUseZkACLs) {
-    SolrZkClient.Builder builder =
+    this.zkClient =
         new SolrZkClient.Builder()
             .withUrl(zkServerAddress)
             .withTimeout(zkClientTimeout, TimeUnit.MILLISECONDS)
             .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
             .withUseDefaultCredsAndACLs(canUseZkACLs)
-            .withReconnectListener(
+            .build();
+    this.zkClient
+        .getCuratorFramework()
+        .getConnectionStateListenable()
+        .addListener(
+            (OnReconnect)
                 () -> {
                   // on reconnect, reload cloud info
                   try {
                     this.createClusterStateWatchersAndUpdate();
-                  } catch (KeeperException e) {
-                    log.error("A ZK error has occurred", e);
-                    throw new ZooKeeperException(
-                        ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
                   } catch (InterruptedException e) {
                     // Restore the interrupted status
                     Thread.currentThread().interrupt();
-                    log.error("Interrupted", e);
-                    throw new ZooKeeperException(ErrorCode.SERVER_ERROR, 
"Interrupted", e);
+                    log.warn("Interrupted", e);
+                  } catch (Throwable e) {
+                    log.error("An error has occurred while updating the 
cluster state", e);
                   }
                 });
-    this.zkClient = builder.build();
     this.closeClient = true;
     this.securityNodeWatcher = null;
     collectionPropertiesZkStateReader = new 
CollectionPropertiesZkStateReader(this);

Reply via email to