Repository: ignite Updated Branches: refs/heads/ignite-zk 308bef23b -> 0918da57c
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0918da57 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0918da57 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0918da57 Branch: refs/heads/ignite-zk Commit: 0918da57ca7439c333566c3404531ff6f0e2312a Parents: 308bef2 Author: sboikov <[email protected]> Authored: Fri Dec 29 14:58:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 29 15:58:53 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 6 ++ .../processors/cache/GridCacheProcessor.java | 4 +- .../discovery/zk/internal/ZookeeperClient.java | 71 ++++++++++++++------ .../zk/internal/ZookeeperDiscoveryImpl.java | 12 ++++ .../cache/GridCacheAbstractSelfTest.java | 10 ++- .../CacheVersionedEntryAbstractTest.java | 33 +++------ .../zk/internal/ZookeeperDiscoverySpiTest.java | 54 +++++++++++++++ 7 files changed, 144 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index b4345b9..36b3ce6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -102,6 +102,7 @@ import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi; import org.apache.ignite.spi.collision.noop.NoopCollisionSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; @@ -2294,6 +2295,11 @@ public class IgnitionEx { ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + DiscoverySpi spi = myCfg.getDiscoverySpi(); + + if (spi instanceof TcpDiscoverySpi) + zkSpi.setClientReconnectDisabled(((TcpDiscoverySpi)spi).isClientReconnectDisabled()); + zkSpi.setSessionTimeout(20_000); zkSpi.setZkConnectionString(zkCluster.getConnectString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 30033d3..4bf1ce6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3250,7 +3250,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Validation result or {@code null} in case of success. */ @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) { - if (!node.isClient()) { + if (!CU.clientNode(node)) { for (DynamicCacheDescriptor desc : cacheDescriptors().values()) { CacheConfiguration cfg = desc.cacheConfiguration(); @@ -3259,7 +3259,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { Object nodeHashObj = aff.resolveNodeHash(node); - for (ClusterNode topNode : ctx.discovery().allNodes()) { + for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) { Object topNodeHashObj = aff.resolveNodeHash(topNode); if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index df0bb43..f5ecf52 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -47,6 +47,9 @@ public class ZookeeperClient implements Watcher { IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 1000); /** */ + private static final int MAX_REQ_SIZE = 1048528; + + /** */ private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; /** */ @@ -142,6 +145,15 @@ public class ZookeeperClient implements Watcher { return zk; } + /** + * @return {@code True} if connected to ZooKeeper. + */ + boolean connected() { + synchronized (stateMux) { + return state == ConnectionState.Connected; + } + } + /** {@inheritDoc} */ @Override public void process(WatchedEvent evt) { if (closing) @@ -179,14 +191,14 @@ public class ZookeeperClient implements Watcher { break; default: - U.error(log, "Unexpected state for zookeeper client, close connection: " + zkState); + U.error(log, "Unexpected state for ZooKeeper client, close connection: " + zkState); newState = ConnectionState.Lost; } if (newState != state) { if (log.isInfoEnabled()) - log.info("Zookeeper client state changed [prevState=" + state + ", newState=" + newState + ']'); + log.info("ZooKeeper client state changed [prevState=" + state + ", newState=" + newState + ']'); state = newState; @@ -278,9 +290,6 @@ public class ZookeeperClient implements Watcher { } - /** */ - private static final int MAX_REQ_SIZE = 1048528; - /** * @param path Path. * @param data Data. @@ -291,6 +300,12 @@ public class ZookeeperClient implements Watcher { return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE; } + /** + * @param path Path. + * @param data Data. + * @param overhead Extra overhead. + * @return Splitted data. + */ List<byte[]> splitNodeData(String path, byte[] data, int overhead) { int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead; @@ -447,6 +462,21 @@ public class ZookeeperClient implements Watcher { /** * @param path Path. + * @param ver Expected version. + * @throws InterruptedException If interrupted. + * @throws KeeperException In case of error. + */ + void deleteIfExistsNoRetry(String path, int ver) throws InterruptedException, KeeperException { + try { + zk.delete(path, ver); + } + catch (KeeperException.NoNodeException e) { + // No-op if znode does not exist. + } + } + + /** + * @param path Path. * @param ver Version. * @throws ZookeeperClientFailedException If connection to zk was lost. * @throws InterruptedException If interrupted. @@ -458,7 +488,7 @@ public class ZookeeperClient implements Watcher { delete(path, ver); } catch (KeeperException.NoNodeException e) { - // No-op if node does not exist. + // No-op if znode does not exist. } } @@ -493,6 +523,9 @@ public class ZookeeperClient implements Watcher { return; } + catch (KeeperException.NoNodeException e) { + throw e; + } catch (Exception e) { onZookeeperError(connStartTime, e); } @@ -532,9 +565,12 @@ public class ZookeeperClient implements Watcher { * @param ver Version. * @throws ZookeeperClientFailedException If connection to zk was lost. * @throws InterruptedException If interrupted. + * @throws KeeperException.NoNodeException If node does not exist. + * @throws KeeperException.BadVersionException If version does not match. */ void setData(String path, byte[] data, int ver) - throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException, KeeperException.BadVersionException + throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException, + KeeperException.BadVersionException { if (data == null) data = EMPTY_BYTES; @@ -547,10 +583,7 @@ public class ZookeeperClient implements Watcher { return; } - catch (KeeperException.NoNodeException e) { - throw e; - } - catch (KeeperException.BadVersionException e) { + catch (KeeperException.BadVersionException | KeeperException.NoNodeException e) { throw e; } catch (Exception e) { @@ -683,9 +716,9 @@ public class ZookeeperClient implements Watcher { synchronized (stateMux) { if (closing) - throw new ZookeeperClientFailedException("Zookeeper client is closed."); + throw new ZookeeperClientFailedException("ZooKeeper client is closed."); - U.warn(log, "Failed to execute zookeeper operation [err=" + e + ", state=" + state + ']'); + U.warn(log, "Failed to execute ZooKeeper operation [err=" + e + ", state=" + state + ']'); if (state == ConnectionState.Lost) { U.error(log, "Operation failed with unexpected error, connection lost: " + e, e); @@ -715,7 +748,7 @@ public class ZookeeperClient implements Watcher { if (remainingTime <= 0) { state = ConnectionState.Lost; - U.warn(log, "Failed to establish zookeeper connection, close client " + + U.warn(log, "Failed to establish ZooKeeper connection, close client " + "[timeout=" + connLossTimeout + ']'); err = new ZookeeperClientFailedException(e); @@ -723,7 +756,7 @@ public class ZookeeperClient implements Watcher { } if (err == null) { - U.warn(log, "Zookeeper operation failed, will retry [err=" + e + + U.warn(log, "ZooKeeper operation failed, will retry [err=" + e + ", retryTimeout=" + RETRY_TIMEOUT + ", connLossTimeout=" + connLossTimeout + ", path=" + ((KeeperException)e).getPath() + @@ -732,11 +765,11 @@ public class ZookeeperClient implements Watcher { stateMux.wait(RETRY_TIMEOUT); if (closing) - throw new ZookeeperClientFailedException("Zookeeper client is closed."); + throw new ZookeeperClientFailedException("ZooKeeper client is closed."); } } else { - U.error(log, "Operation failed with unexpected error, close client: " + e, e); + U.error(log, "Operation failed with unexpected error, close ZooKeeper client: " + e, e); state = ConnectionState.Lost; @@ -771,7 +804,7 @@ public class ZookeeperClient implements Watcher { zk.close(); } catch (Exception closeErr) { - U.warn(log, "Failed to close zookeeper client: " + closeErr, closeErr); + U.warn(log, "Failed to close ZooKeeper client: " + closeErr, closeErr); } connTimer.cancel(); @@ -1116,7 +1149,7 @@ public class ZookeeperClient implements Watcher { state = ConnectionState.Lost; - U.warn(log, "Failed to establish zookeeper connection, close client " + + U.warn(log, "Failed to establish ZooKeeper connection, close client " + "[timeout=" + connLossTimeout + ']'); connLoss = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 10755ad..10d8061 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -3607,6 +3607,18 @@ public class ZookeeperDiscoveryImpl { if (!stop.compareAndSet(false, true)) return; + ZkRuntimeState rtState = this.rtState; + + if (rtState.zkClient != null && rtState.locNodeZkPath != null && rtState.zkClient.connected()) { + try { + rtState.zkClient.deleteIfExistsNoRetry(rtState.locNodeZkPath, -1); + } + catch (Exception err) { + if (log.isDebugEnabled()) + log.debug("Failed to delete local node's znode on stop: " + err); + } + } + IgniteCheckedException err = new IgniteCheckedException("Node stopped."); synchronized (stateMux) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index 7a8453c..80ffdef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -95,9 +95,15 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { initStoreStrategy(); - startGridsMultiThreaded(1, cnt); + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0)); - startGrid(0); + if (cfg.isClientMode() != null && cfg.isClientMode()) { + startGridsMultiThreaded(1, cnt); + + startGrid(getTestIgniteInstanceName(0), cfg, null); + } + else + startGrids(cnt); awaitPartitionMapExchange(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java index 61ceef7..16ea848 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryAbstractTest.java @@ -18,11 +18,12 @@ package org.apache.ignite.internal.processors.cache.version; import java.util.HashSet; +import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntry; @@ -56,23 +57,15 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS public void testInvoke() throws Exception { Cache<Integer, String> cache = grid(0).cache(DEFAULT_CACHE_NAME); - final AtomicInteger invoked = new AtomicInteger(); - - cache.invoke(100, new EntryProcessor<Integer, String, Object>() { - @Override public Object process(MutableEntry<Integer, String> entry, Object... arguments) - throws EntryProcessorException { - - invoked.incrementAndGet(); - + assertNotNull(cache.invoke(100, new EntryProcessor<Integer, String, Object>() { + @Override public Object process(MutableEntry<Integer, String> entry, Object... args) { CacheEntry<Integer, String> verEntry = entry.unwrap(CacheEntry.class); checkVersionedEntry(verEntry); - return entry; + return verEntry.version(); } - }); - - assert invoked.get() > 0; + })); } /** @@ -86,23 +79,17 @@ public abstract class CacheVersionedEntryAbstractTest extends GridCacheAbstractS for (int i = 0; i < ENTRIES_NUM; i++) keys.add(i); - final AtomicInteger invoked = new AtomicInteger(); - - cache.invokeAll(keys, new EntryProcessor<Integer, String, Object>() { - @Override public Object process(MutableEntry<Integer, String> entry, Object... arguments) - throws EntryProcessorException { - - invoked.incrementAndGet(); - + Map<Integer, EntryProcessorResult<Object>> res = cache.invokeAll(keys, new EntryProcessor<Integer, String, Object>() { + @Override public Object process(MutableEntry<Integer, String> entry, Object... args) { CacheEntry<Integer, String> verEntry = entry.unwrap(CacheEntry.class); checkVersionedEntry(verEntry); - return null; + return verEntry.version(); } }); - assert invoked.get() > 0; + assertEquals(ENTRIES_NUM, res.size()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0918da57/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index 314e3ab..0e7141a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -1643,6 +1643,60 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testMultipleClusters() throws Exception { + Ignite c0 = startGrid(0); + + zkRootPath = "/cluster2"; + + Ignite c1 = startGridsMultiThreaded(1, 5); + + zkRootPath = "/cluster3"; + + Ignite c2 = startGridsMultiThreaded(6, 3); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 5); + checkNodesNumber(c2, 3); + + stopGrid(2); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 4); + checkNodesNumber(c2, 3); + + for (int i = 0; i < 3; i++) + stopGrid(i + 6); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 4); + + c2 = startGridsMultiThreaded(6, 2); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 4); + checkNodesNumber(c2, 2); + + evts.clear(); + } + + /** + * @param node Node. + * @param expNodes Expected node in cluster. + * @throws Exception If failed. + */ + private void checkNodesNumber(final Ignite node, final int expNodes) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return node.cluster().nodes().size() == expNodes; + } + }, 5000); + + assertEquals(expNodes, node.cluster().nodes().size()); + } + + /** + * @throws Exception If failed. + */ public void testStartStop1() throws Exception { ackEveryEventSystemProperty();
