Repository: ignite Updated Branches: refs/heads/ignite-zk 48175cf3b -> 6c1fe28c7
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c1fe28c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c1fe28c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c1fe28c Branch: refs/heads/ignite-zk Commit: 6c1fe28c70e677619842f57af09ce182daf3b06e Parents: 48175cf Author: sboikov <[email protected]> Authored: Fri Nov 10 15:27:04 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 10 17:13:32 2017 +0300 ---------------------------------------------------------------------- .../marshaller/MarshallerMappingTransport.java | 7 + .../spi/discovery/zk/ZookeeperClusterNode.java | 12 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 157 +++++++++++++++---- .../zk/ZookeeperDiscoverySpiBasicTest.java | 104 +++++++++++- 4 files changed, 246 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6c1fe28c/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java index b80ae36..e77d4f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java @@ -99,6 +99,13 @@ public final class MarshallerMappingTransport { public GridFutureAdapter<MappingExchangeResult> proposeMapping(MarshallerMappingItem item, ConcurrentMap<Integer, MappedName> cache) throws IgniteCheckedException { GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item); + // TODO ZK + if (true) { + fut.onDone(MappingExchangeResult.createExchangeDisabledResult()); + + return fut; + } + GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut); if (oldFut != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/6c1fe28c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java index 8c28df4..1f16865 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java @@ -59,19 +59,24 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { /** */ private transient boolean loc; - /** TODO */ + /** TODO ZK */ private transient ClusterMetrics metrics; + /** */ + private boolean client; + /** * @param id Node ID. * @param ver Node version. * @param attrs Node attributes. * @param consistentId Consistent ID. + * @param client Client node flag. */ public ZookeeperClusterNode(UUID id, IgniteProductVersion ver, Map<String, Object> attrs, - Serializable consistentId) { + Serializable consistentId, + boolean client) { assert id != null; assert consistentId != null; @@ -79,6 +84,7 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { this.ver = ver; this.attrs = U.sealMap(attrs); this.consistentId = consistentId; + this.client = client; } /** {@inheritDoc} */ @@ -181,7 +187,7 @@ public class ZookeeperClusterNode implements ClusterNode, Serializable { /** {@inheritDoc} */ @Override public boolean isClient() { - return false; + return client; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c1fe28c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index c5dd0dc..c267cf2 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -41,6 +41,7 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; @@ -83,6 +84,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive"; /** */ + private static final byte[] EMPTY_BYTES = new byte[0]; + + /** */ private String connectString; /** */ @@ -173,7 +177,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public Collection<ClusterNode> getRemoteNodes() { - // TODO + // TODO ZK List<ClusterNode> nodes; synchronized (curTop) { @@ -195,7 +199,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Nullable @Override public ClusterNode getNode(UUID nodeId) { - // TODO + // TODO ZK synchronized (curTop) { for (ClusterNode node : curTop.values()) { if (node.id().equals(nodeId)) @@ -208,7 +212,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public boolean pingNode(UUID nodeId) { - // TODO + // TODO ZK return getNode(nodeId) != null; } @@ -243,12 +247,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void disconnect() throws IgniteSpiException { - // TODO + // TODO ZK } /** {@inheritDoc} */ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { - // TODO + // TODO ZK } /** {@inheritDoc} */ @@ -258,19 +262,19 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - // TODO + // TODO ZK throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void failNode(UUID nodeId, @Nullable String warning) { - // TODO + // TODO ZK throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public boolean isClientMode() throws IllegalStateException { - // TODO + // TODO ZK return false; } @@ -283,7 +287,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery locNode = new ZookeeperClusterNode(ignite.configuration().getNodeId(), locNodeVer, locNodeAttrs, - consistentId()); + consistentId(), + ignite.configuration().isClientMode()); locNode.local(true); @@ -301,9 +306,23 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery try { initLocalNode(); - zk = new ZooKeeper(connectString, sesTimeout, zkWatcher); + DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); + + exchange.collect(discoDataBag); + + String threadName = Thread.currentThread().getName(); + + // ZK generates internal threads' names using current thread name. + Thread.currentThread().setName("zk-" + igniteInstanceName); + + try { + zk = new ZooKeeper(connectString, sesTimeout, zkWatcher); + } + finally { + Thread.currentThread().setName(threadName); + } - // TODO: properly handle first node start and init after full cluster restart. + // TODO ZK: properly handle first node start and init after full cluster restart. if (zk.exists(IGNITE_PATH, false) == null) { log.info("Initialize Zookeeper nodes."); @@ -311,11 +330,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ZKClusterData clusterData = new ZKClusterData(U.currentTimeMillis()); - initOps.add(Op.create(IGNITE_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(IGNITE_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); initOps.add(Op.create(CLUSTER_PATH, marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(JOIN_HIST_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(ALIVE_NODES_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(EVENTS_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); zk.multi(initOps); } @@ -330,12 +349,14 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery List<Op> joinOps = new ArrayList<>(); - byte[] nodeData = marshal(locNode); + ZKJoiningNodeData joinData = new ZKJoiningNodeData(locNode, discoDataBag.joiningNodeData()); + + byte[] nodeData = marshal(joinData); String zkNode = "/" + locNode.id().toString() + "-"; joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL)); - joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)); + joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)); List<OpResult> res = zk.multi(joinOps); @@ -402,7 +423,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery final UUID nodeId; /** */ - transient ZookeeperClusterNode clusterNode; + transient ZKJoiningNodeData joinData; /** * @param order Node order. @@ -496,16 +517,20 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (joinHist.put(data.order, data) == null) { try { - byte[] nodeData = zk.getData(path + "/" + child, null, null); + byte[] bytes = zk.getData(path + "/" + child, null, null); + + assert bytes.length > 0; - assert nodeData.length > 0; + ZKJoiningNodeData joinData = unmarshal(bytes); - data.clusterNode = unmarshal(nodeData); + assert joinData.node != null; - data.clusterNode.order(data.order); + joinData.node.order(data.order); + + data.joinData = joinData; } catch (Exception e) { - // TODO + // TODO ZK U.error(log, "Failed to get node data: " + e, e); } } @@ -591,12 +616,32 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ZKNodeData data = joinHist.get(nextJoinOrder); if (data != null) { - curTop.put(data.clusterNode.order(), data.clusterNode); + ZKJoiningNodeData joinData = data.joinData; + + assert joinData != null : data; + + curTop.put(joinData.node.order(), joinData.node); - evts.put(v, new ZKDiscoveryEvent(EventType.EVT_NODE_JOINED, + ZKDiscoveryEvent joinEvt = new ZKDiscoveryEvent(EventType.EVT_NODE_JOINED, v, - data.clusterNode, - new ArrayList<>(curTop.values()))); + joinData.node, + new ArrayList<>(curTop.values())); + + if (!joinData.node.id().equals(locNode.nodeId)) { + DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(joinData.node.id()); + + joiningNodeBag.joiningNodeData(joinData.joiningNodeData); + + exchange.onExchange(joiningNodeBag); + + DiscoveryDataBag collectBag = new DiscoveryDataBag(joinData.node.id(), new HashSet<Integer>()); + + exchange.collect(collectBag); + + joinEvt.discoveryData(joinData.joiningNodeData, collectBag.commonData()); + } + + evts.put(v, joinEvt); if (!newNodes.nodesByOrder.containsKey(data.order)) { v++; @@ -700,11 +745,15 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery boolean locJoin = false; if (lastEvt == null) { - locNode.order(e.node.order()); - locJoin = e.evtType == EventType.EVT_NODE_JOINED && e.node.id().equals(locNode.id()); - fireEvt = locJoin; + if (locJoin) { + locNode.order(e.node.order()); + + fireEvt = true; + } + else + fireEvt = false; } else fireEvt = e.topVer > lastEvt.topVer; @@ -721,12 +770,26 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery assert old == null : node; } + + DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); + + dataBag.joiningNodeData(e.joiningNodeData); + dataBag.commonData(e.commonData); + + exchange.onExchange(dataBag); } else { switch (e.evtType) { case EventType.EVT_NODE_JOINED: { ZookeeperClusterNode node = e.node; + DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); + + dataBag.joiningNodeData(e.joiningNodeData); + dataBag.commonData(e.commonData); + + exchange.onExchange(dataBag); + Object old = curTop.put(node.order(), node); assert old == null : node; @@ -836,6 +899,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery @GridToStringInclude final int topVer; + /** */ + Map<Integer, Serializable> joiningNodeData; + + /** */ + Map<Integer, Serializable> commonData; + /** * @param evtType * @param topVer @@ -849,6 +918,14 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery this.allNodes = allNodes; } + /** + * + */ + void discoveryData(Map<Integer, Serializable> joiningNodeData, Map<Integer, Serializable> commonData) { + this.joiningNodeData = joiningNodeData; + this.commonData = commonData; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ZKDiscoveryEvent.class, this); @@ -886,4 +963,24 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery this.gridStartTime = gridStartTime; } } + + /** + * + */ + private static class ZKJoiningNodeData implements Serializable { + /** */ + private final ZookeeperClusterNode node; + + /** */ + private final Map<Integer, Serializable> joiningNodeData; + + /** + * @param node Node. + * @param joiningNodeData Discovery data. + */ + ZKJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> joiningNodeData) { + this.node = node; + this.joiningNodeData = joiningNodeData; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6c1fe28c/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 7197947..5078744 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -21,10 +21,14 @@ import java.util.List; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -49,6 +53,14 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(zkSpi); + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setMarshaller(new JdkMarshaller()); + return cfg; } @@ -75,7 +87,55 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testStartStop() throws Exception { + public void testStartStop_1_Node() throws Exception { + startGrid(0); + + waitForTopology(1); + + stopGrid(0); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_2_Nodes_WithCache() throws Exception { + startGrids(2); + + for (Ignite node : G.allGrids()) { + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); + + assertNotNull(cache); + + for (int i = 0; i < 100; i++) { + cache.put(i, node.name()); + + assertEquals(node.name(), cache.get(i)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_2_Nodes() throws Exception { + startGrid(0); + + waitForTopology(1); + + startGrid(1); + + waitForTopology(2); + + for (Ignite node : G.allGrids()) + node.compute().broadcast(new DummyCallable(null)); + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop1() throws Exception { startGridsMultiThreaded(5, false); waitForTopology(5); @@ -83,8 +143,30 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { stopGrid(0); waitForTopology(4); + + for (Ignite node : G.allGrids()) + node.compute().broadcast(new DummyCallable(null)); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop2() throws Exception { + startGridsMultiThreaded(10, false); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + stopGrid(idx); + } + }, 3, "stop-node-thread"); + + waitForTopology(7); } + /** + * @param expSize Expected nodes number. + * @throws Exception If failed. + */ private void waitForTopology(final int expSize) throws Exception { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -110,4 +192,24 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } }, 5000)); } + + /** + * + */ + private static class DummyCallable implements IgniteCallable<Object> { + /** */ + private byte[] data; + + /** + * @param data Data. + */ + DummyCallable(byte[] data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return data; + } + } }
