Make ZkClient keep retrying connect on expiring.

This is to prevent Zk reconnect failure due to transient network issue.
With this change in ZkClient, HelixManager retry is no longer needed. Deprecate 
the related option item and simplify handleSessionEstablishmentError logic.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/05583449
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/05583449
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/05583449

Branch: refs/heads/master
Commit: 0558344963115414cbec15c752d44b4eec3cc6c4
Parents: c145c7c
Author: Jiajun Wang <[email protected]>
Authored: Tue Jun 5 10:44:55 2018 -0700
Committer: jiajunwang <[email protected]>
Committed: Thu Jul 12 13:45:17 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/SystemPropertyKeys.java    |   1 +
 .../apache/helix/manager/zk/ZKHelixManager.java |  35 +--
 .../helix/manager/zk/zookeeper/ZkClient.java    |  39 ++-
 .../helix/util/ExponentialBackoffStrategy.java  |  33 +++
 .../helix/integration/TestZkReconnect.java      | 241 -------------------
 .../helix/manager/zk/TestZkReconnect.java       | 239 ++++++++++++++++++
 6 files changed, 310 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java 
b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 7af9635..aa8535b 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -15,6 +15,7 @@ public class SystemPropertyKeys {
 
   public static final String ZK_CONNECTION_TIMEOUT = "zk.connection.timeout";
 
+  @Deprecated
   public static final String ZK_REESTABLISHMENT_CONNECTION_TIMEOUT =
       "zk.connectionReEstablishment.timeout";
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4eb037a..5620ef6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -68,7 +68,6 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
   public static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec
   public static final int MAX_DISCONNECT_THRESHOLD = 5;
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN = 
"allowParticipantAutoJoin";
-  private static final int DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT = 
120000; // Default to 120 sec
   private static final int DEFAULT_WAIT_CONNECTED_TIMEOUT = 10 * 1000;  // 
wait until connected for up to 10 seconds.
 
   protected final String _zkAddress;
@@ -78,7 +77,6 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
   private final int _waitForConnectedTimeout; // wait time for testing connect
   private final int _sessionTimeout; // client side session timeout, will be 
overridden by server timeout. Disconnect after timeout
   private final int _connectionInitTimeout; // client timeout to init connect
-  private final int _connectionRetryTimeout; // retry when connect being 
re-established
   private final List<PreConnectCallback> _preConnectCallbacks;
   protected final List<CallbackHandler> _handlers;
   private final HelixManagerProperties _properties;
@@ -233,10 +231,6 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
             ZkClient.DEFAULT_CONNECTION_TIMEOUT);
 
-    _connectionRetryTimeout = HelixUtil
-        
.getSystemPropertyAsInt(SystemPropertyKeys.ZK_REESTABLISHMENT_CONNECTION_TIMEOUT,
-            DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
-
     _waitForConnectedTimeout = HelixUtil
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
             DEFAULT_WAIT_CONNECTED_TIMEOUT);
@@ -1060,38 +1054,15 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
 
   @Override
   public void handleSessionEstablishmentError(Throwable error) throws 
Exception {
-    LOG.warn("Handling Session Establishment Error. Try to reset connection.", 
error);
+    LOG.warn("Handling Session Establishment Error. Disconnect Helix 
Manager.", error);
     // Cleanup ZKHelixManager
     if (_zkclient != null) {
       _zkclient.close();
     }
     disconnect();
-    // Try to establish connections
-    long operationStartTime = System.currentTimeMillis();
-    while (!isConnected()) {
-      try {
-        connect();
-        break;
-      } catch (Exception e) {
-        if (System.currentTimeMillis() - operationStartTime >= 
_connectionRetryTimeout) {
-          break;
-        }
-        // If retry fails, use the latest exception.
-        error = e;
-        LOG.error("Fail to reset connection after session establishment error 
happens. Will retry.", error);
-        // Yield until next retry.
-        Thread.yield();
-      }
-    }
 
-    if (!isConnected()) {
-      LOG.error("Fail to reset connection after session establishment error 
happens.", error);
-      // retry failed, trigger error handler
-      if (_stateListener != null) {
-        _stateListener.onDisconnected(this, error);
-      }
-    } else {
-      LOG.info("Connection is recovered.");
+    if (_stateListener != null) {
+      _stateListener.onDisconnected(this, error);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 0aa6587..0939826 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -45,6 +45,7 @@ import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
 import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
+import org.apache.helix.util.ExponentialBackoffStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkClient implements Watcher {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+  private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
   protected final IZkConnection _connection;
   protected final long operationRetryTimeoutInMillis;
@@ -783,16 +785,40 @@ public class ZkClient implements Watcher {
     }
     fireStateChangedEvent(event.getState());
     if (event.getState() == KeeperState.Expired) {
+      reconnectOnExpiring();
+    }
+  }
+
+  private void reconnectOnExpiring() {
+    int retryCount = 0;
+    ExponentialBackoffStrategy retryStrategy =
+        new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true);
+
+    Exception reconnectException = new ZkException("Shutdown triggered.");
+    while (!_closed) {
       try {
         reconnect();
         fireNewSessionEvents();
-      } catch (final Exception e) {
-        LOG.warn(
-            "Unable to re-establish connection. Notifying consumer of the 
following exception: ",
-            e);
-        fireSessionEstablishmentError(e);
+        return;
+      } catch (ZkInterruptedException interrupt) {
+        reconnectException = interrupt;
+        break;
+      } catch (Exception e) {
+        reconnectException = e;
+        long waitInterval = retryStrategy.getNextWaitInterval(retryCount++);
+        LOG.warn("ZkClient reconnect on expiring failed. Will retry after {} 
ms", waitInterval, e);
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException ex) {
+          reconnectException = ex;
+          break;
+        }
       }
     }
+
+    LOG.info("Unable to re-establish connection. Notifying consumer of the 
following exception: ",
+        reconnectException);
+    fireSessionEstablishmentError(reconnectException);
   }
 
   private void fireNewSessionEvents() {
@@ -1437,6 +1463,9 @@ public class ZkClient implements Watcher {
    */
   public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException 
{
+    if (_closed) {
+      throw new IllegalStateException("ZkClient already closed!");
+    }
     boolean started = false;
     acquireEventLock();
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
new file mode 100644
index 0000000..b1a66c9
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
@@ -0,0 +1,33 @@
+package org.apache.helix.util;
+
+import java.util.Random;
+
+public class ExponentialBackoffStrategy {
+  private final long INIT_RETRY_INTERVAL = 500;
+  private final long _maxRetryInterval;
+  private final boolean _addJitter;
+  private final Random _ran;
+
+  public ExponentialBackoffStrategy(long maxRetryInterval, boolean addJitter) {
+    _maxRetryInterval = maxRetryInterval;
+    _addJitter = addJitter;
+    _ran = new Random(System.currentTimeMillis());
+  }
+
+  public long getNextWaitInterval(int numberOfTriesFailed) {
+    double exponentialMultiplier = Math.pow(2.0, numberOfTriesFailed - 1);
+    double result = exponentialMultiplier * INIT_RETRY_INTERVAL;
+
+    if (_maxRetryInterval > 0 && result > _maxRetryInterval) {
+      result = _maxRetryInterval;
+    }
+
+    if (_addJitter) {
+      // Adding jitter so the real result would be 75% to 100% of the original 
result.
+      // Don't directly add jitter here, since it may exceed the max retry 
interval setup
+      result = result * (0.75 + _ran.nextDouble() % 0.25);
+    }
+
+    return (long) result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
deleted file mode 100644
index aa23257..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.HelixManagerStateListener;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZkReconnect {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestZkReconnect.class);
-
-  @Test (enabled = false)
-  public void testZKReconnect() throws Exception {
-    final AtomicReference<ZkServer> zkServerRef = new 
AtomicReference<ZkServer>();
-    final int zkPort = TestHelper.getRandomPort();
-    final String zkAddr = String.format("localhost:%d", zkPort);
-    ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-    zkServerRef.set(zkServer);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-
-    // Setup cluster
-    LOG.info("Setup clusters");
-    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
-    clusterSetup.addCluster(clusterName, true);
-
-    // Registers and starts controller
-    LOG.info("Starts controller");
-    HelixManager controller =
-        HelixManagerFactory.getZKHelixManager(clusterName, null, 
InstanceType.CONTROLLER, zkAddr);
-    controller.connect();
-
-    // Registers and starts participant
-    LOG.info("Starts participant");
-    String hostname = "localhost";
-    String instanceId = String.format("%s_%d", hostname, 1);
-    clusterSetup.addInstanceToCluster(clusterName, instanceId);
-    HelixManager participant =
-        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, 
InstanceType.PARTICIPANT,
-            zkAddr);
-    participant.connect();
-
-    LOG.info("Register state machine");
-    final CountDownLatch latch = new CountDownLatch(1);
-    
participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-        new StateModelFactory<StateModel>() {
-          @Override
-          public StateModel createNewStateModel(String resource, String 
stateUnitKey) {
-            return new SimpleStateModel(latch);
-          }
-        }, "test");
-
-    String resourceName = "test-resource";
-    LOG.info("Ideal state assignment");
-    HelixAdmin helixAdmin = participant.getClusterManagmentTool();
-    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
-        IdealState.RebalanceMode.CUSTOMIZED.toString());
-
-    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, 
resourceName);
-    idealState.setReplicas("1");
-    idealState.setStateModelFactoryName("test");
-    idealState.setPartitionState(resourceName + "_0", instanceId, "ONLINE");
-
-    LOG.info("Shutdown ZK server");
-    TestHelper.stopZkServer(zkServerRef.get());
-    Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          LOG.info("Restart ZK server");
-          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
-          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }, 2L, TimeUnit.SECONDS);
-
-    // future.get();
-
-    LOG.info("Before update ideal state");
-    helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
-    LOG.info("After update ideal state");
-
-    LOG.info("Wait for OFFLINE->ONLINE state transition");
-    try {
-      Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
-
-      // wait until stable state
-      boolean result =
-          ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(zkAddr,
-              clusterName));
-      Assert.assertTrue(result);
-
-    } finally {
-      participant.disconnect();
-      zkServerRef.get().shutdown();
-    }
-  }
-
-  @Test
-  public void testHelixManagerStateListenerCallback() throws Exception {
-    final int zkPort = TestHelper.getRandomPort();
-    final String zkAddr = String.format("localhost:%d", zkPort);
-    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName;
-
-    // Init onDisconnectedFlag to check if callback is triggered
-    final AtomicReference<Boolean> onDisconnectedFlag = new 
AtomicReference<>(false);
-    final AtomicReference<Boolean> onConnectedFlag = new 
AtomicReference<>(false);
-
-    // Setup cluster
-    LOG.info("Setup clusters");
-    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
-    clusterSetup.addCluster(clusterName, true);
-    // For fast test, set short timeout
-    System.setProperty("zk.connection.timeout", "2000");
-    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
-
-    // Registers and starts controller, register listener for disconnect 
handling
-    LOG.info("Starts controller");
-    final ZKHelixManager controller =
-        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, 
null, InstanceType.CONTROLLER, zkAddr,
-            new HelixManagerStateListener() {
-              @Override
-              public void onConnected(HelixManager helixManager) throws 
Exception {
-                Assert.assertEquals(helixManager.getClusterName(), 
clusterName);
-                onConnectedFlag.getAndSet(true);
-              }
-
-              @Override
-              public void onDisconnected(HelixManager helixManager, Throwable 
error) throws Exception {
-                Assert.assertEquals(helixManager.getClusterName(), 
clusterName);
-                onDisconnectedFlag.getAndSet(true);
-              }
-            });
-
-    try {
-      controller.connect();
-      Assert.assertTrue(onConnectedFlag.getAndSet(false));
-      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
-
-      // 1. shutdown zkServer and check if handler trigger callback
-      zkServer.shutdown();
-
-      // Retry will fail, and onDisconnectedFlag should be set within 
onDisconnected handler
-      controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertTrue(onDisconnectedFlag.get());
-      Assert.assertFalse(onConnectedFlag.get());
-      Assert.assertFalse(controller.isConnected());
-
-      // Verify ZK is down
-      try {
-        propertyStore.get("/", null, 0);
-        Assert.fail("propertyStore should be disconnected.");
-      } catch (IllegalStateException e) {
-        // Expected exception
-        System.out.println(e.getMessage());
-      }
-
-      // 2. restart zkServer and check if handler will recover connection
-      onDisconnectedFlag.getAndSet(false);
-      zkServer.start();
-
-      // Retry will succeed, and onDisconnectedFlag should not be set
-      controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertFalse(onDisconnectedFlag.get());
-      Assert.assertTrue(onConnectedFlag.get());
-
-      // New propertyStore should be in good state
-      propertyStore = controller.getHelixPropertyStore();
-      propertyStore.get("/", null, 0);
-    } finally {
-      controller.disconnect();
-      zkServer.shutdown();
-      System.clearProperty("zk.connection.timeout");
-      System.clearProperty("zk.connectionReEstablishment.timeout");
-    }
-  }
-
-  public static final class SimpleStateModel extends StateModel {
-
-    private final CountDownLatch latch;
-
-    public SimpleStateModel(CountDownLatch latch) {
-      this.latch = latch;
-    }
-
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
-      // LOG.info(HelixUtils.toString(message));
-      LOG.info("message: " + message);
-      latch.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
new file mode 100644
index 0000000..e2d36b2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -0,0 +1,239 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.*;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkReconnect {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestZkReconnect.class);
+
+  @Test (enabled = false)
+  public void testZKReconnect() throws Exception {
+    final AtomicReference<ZkServer> zkServerRef = new 
AtomicReference<ZkServer>();
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+    zkServerRef.set(zkServer);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+
+    // Registers and starts controller
+    LOG.info("Starts controller");
+    HelixManager controller =
+        HelixManagerFactory.getZKHelixManager(clusterName, null, 
InstanceType.CONTROLLER, zkAddr);
+    controller.connect();
+
+    // Registers and starts participant
+    LOG.info("Starts participant");
+    String hostname = "localhost";
+    String instanceId = String.format("%s_%d", hostname, 1);
+    clusterSetup.addInstanceToCluster(clusterName, instanceId);
+    HelixManager participant =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, 
InstanceType.PARTICIPANT,
+            zkAddr);
+    participant.connect();
+
+    LOG.info("Register state machine");
+    final CountDownLatch latch = new CountDownLatch(1);
+    
participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+        new StateModelFactory<StateModel>() {
+          @Override
+          public StateModel createNewStateModel(String resource, String 
stateUnitKey) {
+            return new SimpleStateModel(latch);
+          }
+        }, "test");
+
+    String resourceName = "test-resource";
+    LOG.info("Ideal state assignment");
+    HelixAdmin helixAdmin = participant.getClusterManagmentTool();
+    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
+        IdealState.RebalanceMode.CUSTOMIZED.toString());
+
+    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, 
resourceName);
+    idealState.setReplicas("1");
+    idealState.setStateModelFactoryName("test");
+    idealState.setPartitionState(resourceName + "_0", instanceId, "ONLINE");
+
+    LOG.info("Shutdown ZK server");
+    TestHelper.stopZkServer(zkServerRef.get());
+    Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          LOG.info("Restart ZK server");
+          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
+          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }, 2L, TimeUnit.SECONDS);
+
+    // future.get();
+
+    LOG.info("Before update ideal state");
+    helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
+    LOG.info("After update ideal state");
+
+    LOG.info("Wait for OFFLINE->ONLINE state transition");
+    try {
+      Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
+
+      // wait until stable state
+      boolean result =
+          ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(zkAddr,
+              clusterName));
+      Assert.assertTrue(result);
+
+    } finally {
+      participant.disconnect();
+      zkServerRef.get().shutdown();
+    }
+  }
+
+  @Test
+  public void testHelixManagerStateListenerCallback() throws Exception {
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    // Init onDisconnectedFlag to check if callback is triggered
+    final AtomicReference<Boolean> onDisconnectedFlag = new 
AtomicReference<>(false);
+    final AtomicReference<Boolean> onConnectedFlag = new 
AtomicReference<>(false);
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+    // For fast test, set short timeout
+    System.setProperty("zk.connection.timeout", "2000");
+    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
+
+    // Registers and starts controller, register listener for disconnect 
handling
+    LOG.info("Starts controller");
+    final ZKHelixManager controller =
+        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, 
null, InstanceType.CONTROLLER, zkAddr,
+            new HelixManagerStateListener() {
+              @Override
+              public void onConnected(HelixManager helixManager) throws 
Exception {
+                Assert.assertEquals(helixManager.getClusterName(), 
clusterName);
+                onConnectedFlag.getAndSet(true);
+              }
+
+              @Override
+              public void onDisconnected(HelixManager helixManager, Throwable 
error) throws Exception {
+                Assert.assertEquals(helixManager.getClusterName(), 
clusterName);
+                onDisconnectedFlag.getAndSet(true);
+              }
+            });
+
+    try {
+      controller.connect();
+      // check onConnected() is triggered
+      Assert.assertTrue(onConnectedFlag.getAndSet(false));
+
+      // 1. shutdown zkServer and check if handler trigger callback
+      zkServer.shutdown();
+      // Simulate a retry in ZkClient that will not succeed
+      WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Expired, null);
+      controller._zkclient.process(event);
+      Assert.assertFalse(controller._zkclient.waitUntilConnected(10000, 
TimeUnit.MILLISECONDS));
+      // While retrying, onDisconnectedFlag = false
+      Assert.assertFalse(onDisconnectedFlag.get());
+
+      // 2. restart zkServer and check if handler will recover connection
+      zkServer.start();
+      Assert.assertTrue(controller._zkclient
+          .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, 
TimeUnit.MILLISECONDS));
+      Assert.assertTrue(controller.isConnected());
+
+      // New propertyStore should be in good state
+      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
+      propertyStore.get("/", null, 0);
+
+      // Inject expire to test handler
+      // onDisconnectedFlag should be set within onDisconnected handler
+      controller.handleSessionEstablishmentError(new Exception("For testing"));
+      Assert.assertTrue(onDisconnectedFlag.get());
+      Assert.assertFalse(onConnectedFlag.get());
+      Assert.assertFalse(controller.isConnected());
+
+      // Verify ZK is down
+      try {
+        controller.getHelixPropertyStore();
+      } catch (HelixException e) {
+        // Expected exception
+        System.out.println(e.getMessage());
+      }
+    } finally {
+      controller.disconnect();
+      zkServer.shutdown();
+      System.clearProperty("zk.connection.timeout");
+      System.clearProperty("zk.connectionReEstablishment.timeout");
+    }
+  }
+
+  public static final class SimpleStateModel extends StateModel {
+
+    private final CountDownLatch latch;
+
+    public SimpleStateModel(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
+      // LOG.info(HelixUtils.toString(message));
+      LOG.info("message: " + message);
+      latch.countDown();
+    }
+  }
+}

Reply via email to