Repository: ignite Updated Branches: refs/heads/ignite-zk 815a53172 -> ade6986c3
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dac5a31e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dac5a31e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dac5a31e Branch: refs/heads/ignite-zk Commit: dac5a31e6fd5499a85befc6bd0f1ae7ebd6b9e0f Parents: 33e8451 Author: sboikov <[email protected]> Authored: Fri Nov 24 13:56:38 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 24 14:24:29 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 39 ++++++++- .../managers/discovery/ConsistentIdNode.java | 33 -------- .../discovery/GridDiscoveryManager.java | 14 ++-- .../managers/discovery/IgniteClusterNode.java | 69 ++++++++++++++++ .../managers/discovery/IgniteDiscoverySpi.java | 32 ++++++++ .../managers/discovery/JoiningNodesAware.java | 27 ------ .../processors/cache/GridCacheAdapter.java | 3 +- .../processors/cache/GridCacheUtils.java | 6 +- .../communication/tcp/TcpCommunicationSpi.java | 6 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 11 ++- .../tcp/internal/TcpDiscoveryNode.java | 33 ++------ .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 40 ++++++++- .../zk/internal/ZookeeperClusterNode.java | 86 ++++++++++++++++---- .../CacheMetricsForClusterGroupSelfTest.java | 6 +- .../ZookeeperDiscoverySpiBasicTest.java | 41 +++++++++- 15 files changed, 320 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 59012bd..d35b8ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -44,6 +44,7 @@ import java.util.logging.Handler; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; @@ -162,7 +163,7 @@ public class IgnitionEx { static { if (TEST_ZK) { - zkCluster = new TestingCluster(1); + zkCluster = createTestingCluster(1); try { zkCluster.start(); @@ -173,6 +174,42 @@ public class IgnitionEx { } } + private static TestingCluster createTestingCluster(int instances) { + String tmpDir = System.getProperty("java.io.tmpdir"); + + List<InstanceSpec> specs = new ArrayList<>(); + + for (int i = 0; i < instances; i++) { + File file = new File(tmpDir, "apacheIgniteTestZk-" + i); + + if (file.isDirectory()) + deleteRecursively0(file); + else { + if (!file.mkdirs()) + throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath()); + } + + + specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, -1, -1)); + } + + return new TestingCluster(specs); + } + + private static void deleteRecursively0(File file) { + File[] files = file.listFiles(); + + if (files == null) + return; + + for (File f : files) { + if (f.isDirectory()) + deleteRecursively0(f); + else + f.delete(); + } + } + /** Default configuration path relative to Ignite home. */ public static final String DFLT_CFG = "config/default-config.xml"; http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdNode.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdNode.java deleted file mode 100644 index fc806ff..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdNode.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.discovery; - -import java.io.Serializable; -import org.apache.ignite.cluster.ClusterNode; - -/** - * TODO ZK - */ -public interface ConsistentIdNode extends ClusterNode { - /** - * Sets consistent globally unique node ID which survives node restarts. - * - * @param consistentId Consistent globally unique node ID. - */ - public void setConsistentId(Serializable consistentId); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index e15df60..9396fe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -53,8 +53,8 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.ClusterMetricsSnapshot; @@ -119,7 +119,6 @@ import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -554,8 +553,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { for (IgniteInClosure<ClusterNode> lsnr : locNodeInitLsnrs) lsnr.apply(locNode); - if (locNode instanceof ConsistentIdNode) { - final ConsistentIdNode node = (ConsistentIdNode)locNode; + if (locNode instanceof IgniteClusterNode) { + final IgniteClusterNode node = (IgniteClusterNode)locNode; if (consistentId != null) node.setConsistentId(consistentId); @@ -2177,8 +2176,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { public boolean reconnectSupported() { DiscoverySpi spi = getSpi(); - return ctx.discovery().localNode().isClient() && (spi instanceof TcpDiscoverySpi) && - !(((TcpDiscoverySpi) spi).isClientReconnectDisabled()); + return ctx.discovery().localNode().isClient() && + (spi instanceof IgniteDiscoverySpi) && + ((IgniteDiscoverySpi)spi).reconnectSupported(); } /** @@ -2191,7 +2191,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoverySpi discoverySpi = getSpi(); - ((TcpDiscoverySpi)discoverySpi).reconnect(); + ((IgniteDiscoverySpi)discoverySpi).reconnect(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java new file mode 100644 index 0000000..5aa938a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * TODO ZK + */ +public interface IgniteClusterNode extends ClusterNode { + /** + * Sets consistent globally unique node ID which survives node restarts. + * + * @param consistentId Consistent globally unique node ID. + */ + public void setConsistentId(Serializable consistentId); + + /** + * Sets node metrics. + * + * @param metrics Node metrics. + */ + public void setMetrics(ClusterMetrics metrics); + + /** + * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated + * and provide up to date information about caches. + * <p> + * Cache metrics are updated with some delay which is directly related to metrics update + * frequency. For example, by default the update will happen every {@code 2} seconds. + * + * @return Runtime metrics snapshots for this node. + */ + public Map<Integer, CacheMetrics> cacheMetrics(); + + /** + * Sets node cache metrics. + * + * @param cacheMetrics Cache metrics. + */ + public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics); + + /** + * Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}). + * + * @return {@code True if client}. + */ + public boolean isCacheClient(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java new file mode 100644 index 0000000..7418352 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.UUID; +import org.apache.ignite.spi.discovery.DiscoverySpi; + +/** + * TODO ZK + */ +public interface IgniteDiscoverySpi extends DiscoverySpi { + public boolean knownNode(UUID nodeId); + + public boolean reconnectSupported(); + + public void reconnect(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java deleted file mode 100644 index 7cd73f4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/JoiningNodesAware.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.discovery; - -import java.util.UUID; - -/** - * TODO ZK - */ -public interface JoiningNodesAware { - public boolean knownNode(UUID nodeId); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index e9c86b7..1050471 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; @@ -3219,7 +3220,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); for (ClusterNode node : grp.nodes()) { - Map<Integer, CacheMetrics> nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics(); + Map<Integer, CacheMetrics> nodeCacheMetrics = ((IgniteClusterNode)node).cacheMetrics(); if (nodeCacheMetrics != null) { CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 248f2aa..d790fa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; @@ -90,7 +91,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.plugin.CachePluginConfiguration; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -1341,8 +1341,8 @@ public class GridCacheUtils { * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). */ public static boolean clientNode(ClusterNode node) { - if (node instanceof TcpDiscoveryNode) - return ((TcpDiscoveryNode)node).isCacheClient(); + if (node instanceof IgniteClusterNode) + return ((IgniteClusterNode)node).isCacheClient(); else return clientNodeDirect(node); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2cdb739..41600d5a 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -64,7 +64,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.discovery.JoiningNodesAware; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.util.GridConcurrentFactory; @@ -491,8 +491,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati unknownNode = false; } } - else if (discoverySpi instanceof JoiningNodesAware) - unknownNode = !((JoiningNodesAware) discoverySpi).knownNode(sndId); + else if (discoverySpi instanceof IgniteDiscoverySpi) + unknownNode = !((IgniteDiscoverySpi) discoverySpi).knownNode(sndId); if (unknownNode) { U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 3b83b2e..0335885 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -55,6 +55,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -222,7 +223,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { +public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi { /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; @@ -2083,6 +2084,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { impl.reconnect(); } + @Override public boolean knownNode(UUID nodeId) { + return getNode0(nodeId) != null; + } + + @Override public boolean reconnectSupported() { + return !clientReconnectDisabled; + } + /** * <strong>FOR TEST ONLY!!!</strong> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index e5ec655..9d638f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -33,10 +33,9 @@ import java.util.UUID; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.managers.discovery.ConsistentIdNode; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -59,7 +58,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTE * <strong>This class is not intended for public use</strong> and has been made * <tt>public</tt> due to certain limitations of Java technology. */ -public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ConsistentIdNode, +public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements IgniteClusterNode, Comparable<TcpDiscoveryNode>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -292,26 +291,14 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Consis return metrics; } - /** - * Sets node metrics. - * - * @param metrics Node metrics. - */ + /** {@inheritDoc} */ public void setMetrics(ClusterMetrics metrics) { assert metrics != null; this.metrics = metrics; } - /** - * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated - * and provide up to date information about caches. - * <p> - * Cache metrics are updated with some delay which is directly related to metrics update - * frequency. For example, by default the update will happen every {@code 2} seconds. - * - * @return Runtime metrics snapshots for this node. - */ + /** {@inheritDoc} */ public Map<Integer, CacheMetrics> cacheMetrics() { if (metricsProvider != null) { Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics(); @@ -324,11 +311,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Consis return cacheMetrics; } - /** - * Sets node cache metrics. - * - * @param cacheMetrics Cache metrics. - */ + /** {@inheritDoc} */ public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) { this.cacheMetrics = cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap(); } @@ -545,11 +528,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Consis return node; } - /** - * Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}). - * - * @return {@code True if client}. - */ + /** {@inheritDoc} */ public boolean isCacheClient() { if (!cacheCliInit) { cacheCli = CU.clientNodeDirect(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index ab59dc4..45c7953 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -23,13 +23,14 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.managers.discovery.JoiningNodesAware; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; @@ -51,7 +52,7 @@ import org.jetbrains.annotations.Nullable; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, JoiningNodesAware { +public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, IgniteDiscoverySpi { /** */ @GridToStringInclude private String zkConnectionString; @@ -105,6 +106,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery @GridToStringExclude private IgniteLogger log; + /** */ + private boolean clientReconnectDisabled; + public String getBasePath() { return basePath; } @@ -145,6 +149,35 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery return this; } + /** + * If {@code true} client does not try to reconnect. + * + * @return Client reconnect disabled flag. + */ + public boolean isClientReconnectDisabled() { + return clientReconnectDisabled; + } + + /** + * Sets client reconnect disabled flag. + * + * @param clientReconnectDisabled Client reconnect disabled flag. + */ + @IgniteSpiConfiguration(optional = true) + public void setClientReconnectDisabled(boolean clientReconnectDisabled) { + this.clientReconnectDisabled = clientReconnectDisabled; + } + + /** {@inheritDoc} */ + @Override public boolean reconnectSupported() { + return !clientReconnectDisabled; + } + + /** {@inheritDoc} */ + @Override public void reconnect() { + // TODO ZK + } + /** {@inheritDoc} */ @Override public boolean knownNode(UUID nodeId) { return impl.knownNode(nodeId); @@ -291,7 +324,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery locNodeVer, locNodeAttrs, consistentId, - ignite.configuration().isClientMode()); + ignite.configuration().isClientMode(), + metricsProvider); locNode.local(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index 504a1b1..f2f0362 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -23,15 +23,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON; @@ -40,11 +42,14 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTE /** * */ -public class ZookeeperClusterNode implements ClusterNode, Serializable { +public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { /** */ private static final long serialVersionUID = 0L; /** */ + private static final byte CLIENT_NODE_MASK = 0x01; + + /** */ private UUID id; /** */ @@ -63,14 +68,22 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { @GridToStringExclude private Map<String, Object> attrs; + /** Metrics provider (transient). */ + @GridToStringExclude + private transient DiscoveryMetricsProvider metricsProvider; + /** */ private transient boolean loc; - /** TODO ZK */ - private transient ClusterMetrics metrics; + /** */ + private transient volatile ClusterMetrics metrics; + + /** Node cache metrics. */ + @GridToStringExclude + private transient volatile Map<Integer, CacheMetrics> cacheMetrics; /** */ - private boolean client; + private byte flags; /** Daemon node flag. */ @GridToStringExclude @@ -87,11 +100,14 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { * @param consistentId Consistent ID. * @param client Client node flag. */ - public ZookeeperClusterNode(UUID id, + public ZookeeperClusterNode( + UUID id, IgniteProductVersion ver, Map<String, Object> attrs, Serializable consistentId, - boolean client) { + boolean client, + DiscoveryMetricsProvider metricsProvider + ) { assert id != null; assert consistentId != null; @@ -99,7 +115,10 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { this.ver = ver; this.attrs = U.sealMap(attrs); this.consistentId = consistentId; - this.client = client; + this.metricsProvider = metricsProvider; + + if (client) + flags |= CLIENT_NODE_MASK; } /** {@inheritDoc} */ @@ -112,11 +131,7 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { return consistentId; } - /** - * Sets consistent globally unique node ID which survives node restarts. - * - * @param consistentId Consistent globally unique node ID. - */ + /** {@inheritDoc} */ public void setConsistentId(Serializable consistentId) { this.consistentId = consistentId; @@ -128,6 +143,11 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { } /** {@inheritDoc} */ + @Override public boolean isCacheClient() { + return isClient(); + } + + /** {@inheritDoc} */ @Nullable @Override public <T> T attribute(String name) { // Even though discovery SPI removes this attribute after authentication, keep this check for safety. if (IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name)) @@ -138,13 +158,47 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { + if (metricsProvider != null) { + ClusterMetrics metrics0 = metricsProvider.metrics(); + + metrics = metrics0; + + return metrics0; + } + + // TODO: ZK if (metrics == null) - metrics = new ClusterMetricsSnapshot(); + return new ClusterMetricsSnapshot(); return metrics; } /** {@inheritDoc} */ + public void setMetrics(ClusterMetrics metrics) { + assert metrics != null; + + this.metrics = metrics; + } + + /** {@inheritDoc} */ + @Override public Map<Integer, CacheMetrics> cacheMetrics() { + if (metricsProvider != null) { + Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics(); + + cacheMetrics = cacheMetrics0; + + return cacheMetrics0; + } + + return cacheMetrics; + } + + /** {@inheritDoc} */ + public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) { + this.cacheMetrics = cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap(); + } + + /** {@inheritDoc} */ @Override public Map<String, Object> attributes() { // Even though discovery SPI removes this attribute after authentication, keep this check for safety. return F.view(attrs, new IgnitePredicate<String>() { @@ -219,7 +273,7 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { /** {@inheritDoc} */ @Override public boolean isClient() { - return client; + return (CLIENT_NODE_MASK & flags) != 0; } /** {@inheritDoc} */ @@ -234,6 +288,6 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { /** {@inheritDoc} */ @Override public String toString() { - return "ZookeeperClusterNode [id=" + id + ", order=" + order + ", client=" + client + ']'; + return "ZookeeperClusterNode [id=" + id + ", order=" + order + ", client=" + isClient() + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java index aefbc23..d7b831b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java @@ -27,9 +27,9 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; @@ -103,7 +103,7 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes(); for (ClusterNode node : nodes) { - Map<Integer, CacheMetrics> metrics = ((TcpDiscoveryNode)node).cacheMetrics(); + Map<Integer, CacheMetrics> metrics = ((IgniteClusterNode)node).cacheMetrics(); assertNotNull(metrics); assertFalse(metrics.isEmpty()); } @@ -134,7 +134,7 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes(); for (ClusterNode node : nodes) { - Map<Integer, CacheMetrics> metrics = ((TcpDiscoveryNode) node).cacheMetrics(); + Map<Integer, CacheMetrics> metrics = ((IgniteClusterNode)node).cacheMetrics(); assertNotNull(metrics); assertTrue(metrics.isEmpty()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dac5a31e/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index a9f0d7f..4ccafa2 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,9 +32,11 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -186,12 +189,48 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { IgnitionEx.TEST_ZK = false; if (USE_TEST_CLUSTER) { - zkCluster = new TestingCluster(ZK_SRVS); + zkCluster = createTestingCluster(ZK_SRVS); zkCluster.start(); } } + private static TestingCluster createTestingCluster(int instances) { + String tmpDir = System.getProperty("java.io.tmpdir"); + + List<InstanceSpec> specs = new ArrayList<>(); + + for (int i = 0; i < instances; i++) { + File file = new File(tmpDir, "apacheIgniteTestZk-" + i); + + if (file.isDirectory()) + deleteRecursively0(file); + else { + if (!file.mkdirs()) + throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath()); + } + + + specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, -1, -1)); + } + + return new TestingCluster(specs); + } + + private static void deleteRecursively0(File file) { + File[] files = file.listFiles(); + + if (files == null) + return; + + for (File f : files) { + if (f.isDirectory()) + deleteRecursively0(f); + else + f.delete(); + } + } + /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { if (zkCluster != null) {
