Repository: nifi Updated Branches: refs/heads/master 2d6bba080 -> 78de10dec
NIFI-3150 Added logic to wait for the zk client to connect to the configured server Signed-off-by: Bryan Rosander <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/78de10de Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/78de10de Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/78de10de Branch: refs/heads/master Commit: 78de10dec0127598fbbbb8ac7f7048cec3aecb6b Parents: 2d6bba0 Author: Jeff Storck <[email protected]> Authored: Mon Dec 5 13:19:31 2016 -0500 Committer: Bryan Rosander <[email protected]> Committed: Mon Dec 5 22:06:00 2016 -0500 ---------------------------------------------------------------------- .../toolkit/zkmigrator/ZooKeeperMigrator.java | 36 +++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/78de10de/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java index c15286e..c108523 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java @@ -27,6 +27,7 @@ import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; @@ -46,7 +47,9 @@ import java.util.List; import java.util.Spliterators; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -132,7 +135,7 @@ class ZooKeeperMigrator { LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig); Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()), "Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData); - Preconditions.checkArgument( !(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource), + Preconditions.checkArgument(!(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource), "Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s", sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig); @@ -284,14 +287,45 @@ class ZooKeeperMigrator { } private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, AuthMode authMode, byte[] authData) throws IOException { + CountDownLatch connectionLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(zooKeeperEndpointConfig.getConnectString(), 3000, watchedEvent -> { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperEndpointConfig); + } + if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { + connectionLatch.countDown(); + } }); + + final boolean connected; + try { + connected = connectionLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + closeZooKeeper(zooKeeper); + Thread.currentThread().interrupt(); // preserve interrupt + throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e); + } + + if (!connected) { + closeZooKeeper(zooKeeper); + throw new IOException(String.format("unable to connect to %s, state is %s", zooKeeperEndpointConfig, zooKeeper.getState())); + } + if (authMode.equals(AuthMode.DIGEST)) { zooKeeper.addAuthInfo(SCHEME_DIGEST, authData); } return zooKeeper; } + private void closeZooKeeper(ZooKeeper zooKeeper) { + try { + zooKeeper.close(); + } catch (InterruptedException e) { + LOGGER.warn("could not close ZooKeeper client due to interrupt", e); + Thread.currentThread().interrupt(); // preserve interrupt + } + } + ZooKeeperEndpointConfig getZooKeeperEndpointConfig() { return zooKeeperEndpointConfig; }
