Repository: ignite Updated Branches: refs/heads/ignite-zk d977dcb81 -> df49a51b5
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df49a51b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df49a51b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df49a51b Branch: refs/heads/ignite-zk Commit: df49a51b59b9411b900db38604d009ff5e7dc9c8 Parents: d977dcb Author: sboikov <[email protected]> Authored: Fri Dec 22 15:45:14 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 22 17:00:48 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZkClusterNodes.java | 4 +- .../ZkCommunicationErrorResolveResult.java | 6 +- .../zk/internal/ZkDiscoveryEventData.java | 6 +- .../zk/internal/ZkDiscoveryEventsData.java | 27 +- .../internal/ZkDiscoveryNodeFailEventData.java | 6 +- .../internal/ZkDiscoveryNodeJoinEventData.java | 4 +- .../discovery/zk/internal/ZkIgnitePaths.java | 2 +- .../zk/internal/ZkInternalJoinErrorMessage.java | 4 +- .../discovery/zk/internal/ZkRuntimeState.java | 7 +- .../discovery/zk/internal/ZkTimeoutObject.java | 3 + .../zk/internal/ZookeeperClusterNode.java | 6 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 268 +++++++++++-------- .../ZookeeperDiscoverySpiBasicTest.java | 57 +++- 13 files changed, 269 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java index 4c114a3..70bdc3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java @@ -33,7 +33,7 @@ public class ZkClusterNodes { final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new ConcurrentSkipListMap<>(); /** */ - final ConcurrentSkipListMap<Integer, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>(); + final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>(); /** */ final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new ConcurrentHashMap<>(); @@ -84,7 +84,7 @@ public class ZkClusterNodes { * @param internalId Node internal ID. * @return Removed node. */ - ZookeeperClusterNode removeNode(int internalId) { + ZookeeperClusterNode removeNode(long internalId) { ZookeeperClusterNode node = nodesByInternalId.remove(internalId); assert node != null : internalId; http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java index 68cbdb8..23495aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java @@ -18,7 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; -import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.GridLongList; import org.jetbrains.annotations.Nullable; /** @@ -29,7 +29,7 @@ class ZkCommunicationErrorResolveResult implements Serializable { private static final long serialVersionUID = 0L; /** */ - final GridIntList killedNodes; + final GridLongList killedNodes; /** */ final Exception err; @@ -38,7 +38,7 @@ class ZkCommunicationErrorResolveResult implements Serializable { * @param killedNodes Killed nodes. * @param err Error. */ - ZkCommunicationErrorResolveResult(@Nullable GridIntList killedNodes, Exception err) { + ZkCommunicationErrorResolveResult(@Nullable GridLongList killedNodes, Exception err) { this.killedNodes = killedNodes; this.err = err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java index d8f9a3d..7216f39 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -45,7 +45,7 @@ abstract class ZkDiscoveryEventData implements Serializable { private final long topVer; /** */ - private transient Set<Integer> remainingAcks; + private transient Set<Long> remainingAcks; /** */ int flags; @@ -101,7 +101,7 @@ abstract class ZkDiscoveryEventData implements Serializable { /** * @return Remaining acks. */ - Set<Integer> remainingAcks() { + Set<Long> remainingAcks() { return remainingAcks; } @@ -110,7 +110,7 @@ abstract class ZkDiscoveryEventData implements Serializable { * @param ackEvtId Last event ID processed on node. * @return {@code True} if all nodes processed event. */ - boolean onAckReceived(Integer nodeInternalId, long ackEvtId) { + boolean onAckReceived(Long nodeInternalId, long ackEvtId) { assert remainingAcks != null; if (ackEvtId >= evtId) http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index 6bdf573..f8727e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -40,26 +40,47 @@ class ZkDiscoveryEventsData implements Serializable { long topVer; /** */ - long gridStartTime; + long maxInternalOrder; /** */ - TreeMap<Long, ZkDiscoveryEventData> evts; + final long startInternalOrder; + + /** */ + final long gridStartTime; + + /** */ + final TreeMap<Long, ZkDiscoveryEventData> evts; /** */ private UUID commErrFutId; /** + * @param startInternalOrder First * @param topVer Current topology version. * @param gridStartTime Cluster start time. * @param evts Events history. */ - ZkDiscoveryEventsData(long gridStartTime, long topVer, TreeMap<Long, ZkDiscoveryEventData> evts) { + ZkDiscoveryEventsData( + long startInternalOrder, + long gridStartTime, + long topVer, + TreeMap<Long, ZkDiscoveryEventData> evts) + { + this.startInternalOrder = startInternalOrder; this.gridStartTime = gridStartTime; this.topVer = topVer; this.evts = evts; } /** + * @param node Joined node. + */ + void onNodeJoin(ZookeeperClusterNode node) { + if (node.internalId() > maxInternalOrder) + maxInternalOrder = node.internalId(); + } + + /** * @return Future ID. */ @Nullable UUID communicationErrorResolveFutureId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java index a5dbd02..b75cb7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java @@ -27,14 +27,14 @@ class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData { private static final long serialVersionUID = 0L; /** */ - private int failedNodeInternalId; + private long failedNodeInternalId; /** * @param evtId Event ID. * @param topVer Topology version. * @param failedNodeInternalId Failed node ID. */ - ZkDiscoveryNodeFailEventData(long evtId, long topVer, int failedNodeInternalId) { + ZkDiscoveryNodeFailEventData(long evtId, long topVer, long failedNodeInternalId) { super(evtId, EventType.EVT_NODE_FAILED, topVer); this.failedNodeInternalId = failedNodeInternalId; @@ -43,7 +43,7 @@ class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData { /** * @return Failed node ID. */ - int failedNodeInternalId() { + long failedNodeInternalId() { return failedNodeInternalId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index 89f7b42..93341d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -28,7 +28,7 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { private static final long serialVersionUID = 0L; /** */ - final int joinedInternalId; + final long joinedInternalId; /** */ final UUID nodeId; @@ -61,7 +61,7 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, - int joinedInternalId, + long joinedInternalId, UUID joinDataPrefixId, int joinDataPartCnt, int dataForJoinedPartCnt, http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index c095e73..8f35a8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -177,7 +177,7 @@ class ZkIgnitePaths { * @param path Alive node zk path. * @return Node internal ID. */ - static int aliveInternalId(String path) { + static long aliveInternalId(String path) { int idx = path.lastIndexOf('|'); return Integer.parseInt(path.substring(idx + 1)); http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java index 6040c20..a73312c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java @@ -28,7 +28,7 @@ class ZkInternalJoinErrorMessage implements ZkInternalMessage { transient boolean notifyNode = true; /** */ - final int nodeInternalId; + final long nodeInternalId; /** */ final String err; @@ -37,7 +37,7 @@ class ZkInternalJoinErrorMessage implements ZkInternalMessage { * @param nodeInternalId Joining node internal ID. * @param err Error message. */ - ZkInternalJoinErrorMessage(int nodeInternalId, String err) { + ZkInternalJoinErrorMessage(long nodeInternalId, String err) { this.nodeInternalId = nodeInternalId; this.err = err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index 7479c68..02e75ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -43,13 +43,16 @@ class ZkRuntimeState { ZookeeperClient zkClient; /** */ - int internalOrder; + long internalOrder; /** */ int joinDataPartCnt; /** */ - IgniteSpiTimeoutObject joinErrTimeoutObj; + ZkTimeoutObject joinErrTimeoutObj; + + /** */ + ZkTimeoutObject joinTimeoutObj; /** */ long gridStartTime; http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java index b2d2344..4d3d5b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java @@ -30,6 +30,9 @@ abstract class ZkTimeoutObject implements IgniteSpiTimeoutObject { /** */ private final long endTime; + /** */ + volatile boolean cancelled; + /** * @param timeout Timeout. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index 75e5715..33be383 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -55,7 +55,7 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co private Serializable consistentId; /** */ - private int internalId; + private long internalId; /** */ private long order; @@ -258,14 +258,14 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co /** * @return Internal ID corresponds to Zookeeper sequential node. */ - int internalId() { + long internalId() { return internalId; } /** * @param internalId Internal ID corresponds to Zookeeper sequential node. */ - void internalId(int internalId) { + void internalId(long internalId) { this.internalId = internalId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index de89df7..30bd750 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -54,13 +54,12 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.processors.security.SecurityContext; -import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -94,6 +93,9 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2; import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; import static org.apache.zookeeper.CreateMode.PERSISTENT; @@ -462,7 +464,7 @@ public class ZookeeperDiscoveryImpl { try { locNode.onClientDisconnected(newId); - joinTopology(true, rtState.joined); + joinTopology(rtState); } catch (Exception e) { U.error(log, "Failed to reconnect: " + e, e); @@ -654,7 +656,7 @@ public class ZookeeperDiscoveryImpl { * @throws InterruptedException If interrupted. */ public void startJoinAndWait() throws InterruptedException { - joinTopology(false, false); + joinTopology(null); for (;;) { try { @@ -677,16 +679,19 @@ public class ZookeeperDiscoveryImpl { } /** - * @param reconnect {@code True} if client node reconnects. - * @param prevJoined {@code True} if reconnect after already joined topology - * in this case (need produce EVT_CLIENT_NODE_RECONNECTED event). + * @param prevState Previous state in case of connect retry. * @throws InterruptedException If interrupted. */ - private void joinTopology(boolean reconnect, boolean prevJoined) throws InterruptedException { + private void joinTopology(@Nullable ZkRuntimeState prevState) throws InterruptedException { if (!busyLock.enterBusy()) return; try { + boolean reconnect = prevState != null; + + // Need fire EVT_CLIENT_NODE_RECONNECTED event if reconnect after already joined. + boolean prevJoined = prevState != null && prevState.joined; + IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; if (internalLsnr != null) @@ -733,7 +738,7 @@ public class ZookeeperDiscoveryImpl { throw new IgniteSpiException("Failed to create Zookeeper client", e); } - startJoin(rtState, joinDataBytes); + startJoin(rtState, prevState, joinDataBytes); } finally { busyLock.leaveBusy(); @@ -894,10 +899,14 @@ public class ZookeeperDiscoveryImpl { } /** + * @param rtState Runtime state. * @param joinDataBytes Joining node data. + * @param prevState Previous state in case of connect retry. * @throws InterruptedException If interrupted. */ - private void startJoin(ZkRuntimeState rtState, final byte[] joinDataBytes) throws InterruptedException { + private void startJoin(ZkRuntimeState rtState, @Nullable ZkRuntimeState prevState, final byte[] joinDataBytes) + throws InterruptedException + { try { long startTime = System.currentTimeMillis(); @@ -940,16 +949,18 @@ public class ZookeeperDiscoveryImpl { null, EPHEMERAL_SEQUENTIAL); - log.info("Node started join [nodeId=" + locNode.id() + - ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + - ", joinDataSize=" + joinDataBytes.length + - ", joinDataPartCnt=" + rtState.joinDataPartCnt + - ", consistentId=" + locNode.consistentId() + - ", initTime=" + (System.currentTimeMillis() - startTime) + - ", nodePath=" + rtState.locNodeZkPath + ']'); - rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); + if (log.isInfoEnabled()) { + log.info("Node started join [nodeId=" + locNode.id() + + ", instanceName=" + locNode.attribute(ATTR_IGNITE_INSTANCE_NAME) + + ", joinDataSize=" + joinDataBytes.length + + (rtState.joinDataPartCnt > 1 ? (", joinDataPartCnt=" + rtState.joinDataPartCnt) : "") + + ", consistentId=" + locNode.consistentId() + + ", initTime=" + (System.currentTimeMillis() - startTime) + + ", nodePath=" + rtState.locNodeZkPath + ']'); + } + /* If node can not join due to validation error this error is reported in join data, As a minor optimization do not start watch join data immediately, but only if do not receive @@ -959,20 +970,24 @@ public class ZookeeperDiscoveryImpl { rtState.joinErrTimeoutObj = joinErrorWatcher.timeoutObj; + if (locNode.isClient() && spi.getJoinTimeout() > 0) { + ZkTimeoutObject joinTimeoutObj = prevState != null ? prevState.joinTimeoutObj : null; + + if (joinTimeoutObj == null) { + joinTimeoutObj = new JoinTimeoutObject(spi.getJoinTimeout()); + + spi.getSpiContext().addTimeoutObject(joinTimeoutObj); + } + + rtState.joinTimeoutObj = joinTimeoutObj; + } + if (!locNode.isClient()) zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher); spi.getSpiContext().addTimeoutObject(rtState.joinErrTimeoutObj); - - if (locNode.isClient() && spi.getJoinTimeout() > 0) { - if (!rtState.prevJoined) { - JoinTimeoutObject joinTimeoutObj = new JoinTimeoutObject(spi.getJoinTimeout()); - - spi.getSpiContext().addTimeoutObject(joinTimeoutObj); - } - } } catch (IgniteCheckedException | ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -996,6 +1011,7 @@ public class ZookeeperDiscoveryImpl { try { SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); + // Note: exception message test is checked in tests. if (subj == null) throw new IgniteSpiException("Authentication failed for local node."); @@ -1004,7 +1020,7 @@ public class ZookeeperDiscoveryImpl { Map<String, Object> attrs = new HashMap<>(locNode.attributes()); - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, U.marshal(marsh, subj)); + attrs.put(ATTR_SECURITY_SUBJECT_V2, U.marshal(marsh, subj)); locNode.setAttributes(attrs); } @@ -1023,7 +1039,7 @@ public class ZookeeperDiscoveryImpl { Map<String, Object> attrs = new HashMap<>(node.getAttributes()); - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, unzip(zipBytes)); + attrs.put(ATTR_SECURITY_SUBJECT_V2, unzip(zipBytes)); node.setAttributes(attrs); } @@ -1034,7 +1050,7 @@ public class ZookeeperDiscoveryImpl { * @throws IgniteCheckedException If failed to unmarshal. */ private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode node) throws Exception { - byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + byte[] credBytes = (byte[])node.getAttributes().get(ATTR_SECURITY_CREDENTIALS); if (credBytes == null) return null; @@ -1053,14 +1069,14 @@ public class ZookeeperDiscoveryImpl { // Use security-unsafe getter. Map<String, Object> attrs0 = node.getAttributes(); - Object creds = attrs0.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + Object creds = attrs0.get(ATTR_SECURITY_CREDENTIALS); if (creds != null) { Map<String, Object> attrs = new HashMap<>(attrs0); assert !(creds instanceof byte[]); - attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, marshalZip(creds)); + attrs.put(ATTR_SECURITY_CREDENTIALS, marshalZip(creds)); node.setAttributes(attrs); } @@ -1083,17 +1099,27 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void onTimeout() { - if (rtState.joined) + if (cancelled || rtState.joined) return; runInWorkerThread(new Runnable() { @Override public void run() { synchronized (stateMux) { + if (cancelled || rtState.joined) + return; + if (connState == ConnectionState.STOPPED) return; connState = ConnectionState.STOPPED; } + + U.warn(log, "Failed to connect to cluster, either connection to ZooKeeper can not be established or there " + + "are no alive server nodes (consider increasing 'joinTimeout' configuration property) [" + + "joinTimeout=" + spi.getJoinTimeout() + ']'); + + // Note: exception message test is checked in tests. + onSegmented(new IgniteSpiException("Failed to connect to cluster within configured timeout")); } }); } @@ -1190,32 +1216,37 @@ public class ZookeeperDiscoveryImpl { private void checkIsCoordinator(int rc, final List<String> aliveNodes) throws Exception { assert rc == 0 : KeeperException.Code.get(rc); - TreeMap<Integer, String> alives = new TreeMap<>(); + TreeMap<Long, String> aliveSrvs = new TreeMap<>(); - int locInternalId = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); + long locInternalOrder = rtState.internalOrder; for (String aliveNodePath : aliveNodes) { - Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + if (ZkIgnitePaths.aliveClientNode(aliveNodePath)) + continue; + + Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); - alives.put(internalId, aliveNodePath); + aliveSrvs.put(internalId, aliveNodePath); } - assert !alives.isEmpty(); + assert !aliveSrvs.isEmpty(); - Map.Entry<Integer, String> crdE = alives.firstEntry(); + Map.Entry<Long, String> crdE = aliveSrvs.firstEntry(); - if (locInternalId == crdE.getKey()) - onBecomeCoordinator(aliveNodes, locInternalId); + if (locInternalOrder == crdE.getKey()) + onBecomeCoordinator(aliveNodes); else { - assert alives.size() > 1; + assert aliveSrvs.size() > 1 : aliveSrvs; - Map.Entry<Integer, String> prevE = alives.floorEntry(locInternalId - 1); + Map.Entry<Long, String> prevE = aliveSrvs.floorEntry(locInternalOrder - 1); assert prevE != null; - log.info("Discovery coordinator already exists, watch for previous node [" + - "locId=" + locNode.id() + - ", prevPath=" + prevE.getValue() + ']'); + if (log.isInfoEnabled()) { + log.info("Discovery coordinator already exists, watch for previous server node [" + + "locId=" + locNode.id() + + ", prevPath=" + prevE.getValue() + ']'); + } PreviousNodeWatcher watcher = new PreviousNodeWatcher(rtState); @@ -1270,14 +1301,10 @@ public class ZookeeperDiscoveryImpl { /** * @param aliveNodes Alive nodes paths. - * @param locInternalId Local node's internal ID. * @throws Exception If failed. */ - private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception { - byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath); - - if (evtsDataBytes.length > 0) - processNewEvents(evtsDataBytes); + private void onBecomeCoordinator(List<String> aliveNodes) throws Exception { + ZkDiscoveryEventsData prevEvts = processNewEvents(rtState.zkClient.getData(zkPaths.evtsPath)); rtState.crd = true; @@ -1313,7 +1340,7 @@ public class ZookeeperDiscoveryImpl { } else { if (log.isInfoEnabled()) - log.info("Node is first cluster node [locId=" + locNode.id() + ']'); + log.info("Node is first server node in cluster [locId=" + locNode.id() + ']'); DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); @@ -1331,7 +1358,7 @@ public class ZookeeperDiscoveryImpl { } } - newClusterStarted(locInternalId); + newClusterStarted(prevEvts); } rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); @@ -1363,10 +1390,13 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Process alive nodes change [alives=" + aliveNodes.size() + "]"); - TreeMap<Integer, String> alives = new TreeMap<>(); + TreeMap<Long, String> alives = new TreeMap<>(); for (String child : aliveNodes) { - Integer internalId = ZkIgnitePaths.aliveInternalId(child); + Long internalId = ZkIgnitePaths.aliveInternalId(child); + + if (internalId < rtState.evtsData.startInternalOrder) + continue; Object old = alives.put(internalId, child); @@ -1381,7 +1411,7 @@ public class ZookeeperDiscoveryImpl { List<ZookeeperClusterNode> failedNodes = null; - for (Map.Entry<Integer, ZookeeperClusterNode> e : rtState.top.nodesByInternalId.entrySet()) { + for (Map.Entry<Long, ZookeeperClusterNode> e : rtState.top.nodesByInternalId.entrySet()) { if (!alives.containsKey(e.getKey())) { ZookeeperClusterNode failedNode = e.getValue(); @@ -1411,8 +1441,8 @@ public class ZookeeperDiscoveryImpl { } } - for (Map.Entry<Integer, String> e : alives.entrySet()) { - Integer internalId = e.getKey(); + for (Map.Entry<Long, String> e : alives.entrySet()) { + Long internalId = e.getKey(); if (!rtState.top.nodesByInternalId.containsKey(internalId)) { UUID rslvFutId = rtState.evtsData.communicationErrorResolveFutureId(); @@ -1559,7 +1589,7 @@ public class ZookeeperDiscoveryImpl { */ private boolean processJoinOnCoordinator( TreeMap<Long, ZookeeperClusterNode> curTop, - int internalId, + long internalId, String aliveNodePath) throws Exception { @@ -1694,6 +1724,7 @@ public class ZookeeperDiscoveryImpl { "Authentication failed [nodeId=" + U.id8(node.id()) + ", addrs=" + U.addressesAsString(node) + ']'); + // Note: exception message test is checked in tests. return new ZkNodeValidateResult("Authentication failed"); } @@ -1778,7 +1809,7 @@ public class ZookeeperDiscoveryImpl { private void generateNodeJoin( TreeMap<Long, ZookeeperClusterNode> curTop, ZkJoiningNodeData joiningNodeData, - int internalId, + long internalId, UUID prefixId, @Nullable byte[] secSubjZipBytes) throws Exception @@ -1847,6 +1878,8 @@ public class ZookeeperDiscoveryImpl { dataForJoinedPartCnt, secSubjPartCnt); + rtState.evtsData.onNodeJoin(joinedNode); + evtData.joiningNodeData = joiningNodeData; rtState.evtsData.addEvent(dataForJoined.topology(), evtData); @@ -1886,24 +1919,34 @@ public class ZookeeperDiscoveryImpl { } /** - * @param locInternalId Local node internal ID. + * @param prevEvts Events from previous cluster. * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void newClusterStarted(int locInternalId) throws Exception { + private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) throws Exception { + assert prevEvts == null || prevEvts.maxInternalOrder < locNode.internalId(); + spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj); cleanupPreviousClusterData(); + long locInternalId = rtState.internalOrder; + rtState.joined = true; rtState.gridStartTime = U.currentTimeMillis(); - rtState.evtsData = new ZkDiscoveryEventsData(rtState.gridStartTime, 1L, new TreeMap<Long, ZkDiscoveryEventData>()); + rtState.evtsData = new ZkDiscoveryEventsData( + prevEvts != null ? prevEvts.maxInternalOrder + 1 : locInternalId, + rtState.gridStartTime, + 1L, + new TreeMap<Long, ZkDiscoveryEventData>()); locNode.internalId(locInternalId); locNode.order(1); + rtState.evtsData.onNodeJoin(locNode); + rtState.top.addNode(locNode); String locAlivePath = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1); @@ -2176,10 +2219,11 @@ public class ZookeeperDiscoveryImpl { /** * @param data Marshalled events. * @throws Exception If failed. + * @return Events. */ - private void processNewEvents(byte[] data) throws Exception { + @Nullable private ZkDiscoveryEventsData processNewEvents(byte[] data) throws Exception { if (data.length == 0) - return; + return null; assert !rtState.crd; @@ -2203,6 +2247,8 @@ public class ZookeeperDiscoveryImpl { processNewEvents(newEvts); rtState.evtsData = newEvts; + + return newEvts; } /** @@ -2260,7 +2306,7 @@ public class ZookeeperDiscoveryImpl { exchange.onExchange(dataBag); } - if (evtData0.secSubjPartCnt > 0 && joiningData.node().attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2) == null) + if (evtData0.secSubjPartCnt > 0 && joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null) readAndInitSecuritySubject(joiningData.node(), evtData0); notifyNodeJoin(evtData0, joiningData); @@ -2385,66 +2431,78 @@ public class ZookeeperDiscoveryImpl { private void processLocalJoin(ZkDiscoveryEventsData evtsData, final ZkDiscoveryNodeJoinEventData evtData) throws Exception { - if (log.isInfoEnabled()) - log.info("Local join event data: " + evtData + ']'); + synchronized (stateMux) { + if (connState == ConnectionState.STOPPED) + return; - spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj); + if (rtState.joinTimeoutObj != null) { + spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj); - String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); + rtState.joinTimeoutObj.cancelled = true; + rtState.joinTimeoutObj = null; + } - byte[] dataForJoinedBytes = readMultipleParts(rtState.zkClient, path, evtData.dataForJoinedPartCnt); + spi.getSpiContext().removeTimeoutObject(rtState.joinErrTimeoutObj); - ZkJoinEventDataForJoined dataForJoined = unmarshalZip(dataForJoinedBytes); + if (log.isInfoEnabled()) + log.info("Local join event data: " + evtData + ']'); - rtState.gridStartTime = evtsData.gridStartTime; + String path = zkPaths.joinEventDataPathForJoined(evtData.eventId()); - locNode.internalId(evtData.joinedInternalId); - locNode.order(evtData.topologyVersion()); + byte[] dataForJoinedBytes = readMultipleParts(rtState.zkClient, path, evtData.dataForJoinedPartCnt); - readAndInitSecuritySubject(locNode, evtData); + ZkJoinEventDataForJoined dataForJoined = unmarshalZip(dataForJoinedBytes); - DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id()); + rtState.gridStartTime = evtsData.gridStartTime; - dataBag.commonData(dataForJoined.discoveryData()); + locNode.internalId(evtData.joinedInternalId); + locNode.order(evtData.topologyVersion()); - exchange.onExchange(dataBag); + readAndInitSecuritySubject(locNode, evtData); - List<ZookeeperClusterNode> allNodes = dataForJoined.topology(); + DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id()); - for (int i = 0; i < allNodes.size(); i++) { - ZookeeperClusterNode node = allNodes.get(i); + dataBag.commonData(dataForJoined.discoveryData()); - node.setMetrics(new ClusterMetricsSnapshot()); + exchange.onExchange(dataBag); - rtState.top.addNode(node); - } + List<ZookeeperClusterNode> allNodes = dataForJoined.topology(); - rtState.top.addNode(locNode); + for (int i = 0; i < allNodes.size(); i++) { + ZookeeperClusterNode node = allNodes.get(i); - final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); + node.setMetrics(new ClusterMetricsSnapshot()); - lsnr.onDiscovery(evtData.eventType(), - evtData.topologyVersion(), - locNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + rtState.top.addNode(node); + } - if (rtState.prevJoined) { - lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, + rtState.top.addNode(locNode); + + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); + + lsnr.onDiscovery(evtData.eventType(), evtData.topologyVersion(), locNode, topSnapshot, Collections.<Long, Collection<ClusterNode>>emptyMap(), null); - U.quietAndWarn(log, "Client node was reconnected after it was already considered failed [locId=" + locNode.id() + ']'); + if (rtState.prevJoined) { + lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, + evtData.topologyVersion(), + locNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + + U.quietAndWarn(log, "Client node was reconnected after it was already considered failed [locId=" + locNode.id() + ']'); + } + + rtState.joined = true; } joinFut.onDone(); - rtState.joined = true; - deleteDataForJoinedAsync(evtData); } @@ -2532,7 +2590,7 @@ public class ZookeeperDiscoveryImpl { failedNodes = U.newHashSet(res.killedNodes.size()); for (int i = 0; i < res.killedNodes.size(); i++) { - int internalId = res.killedNodes.get(i); + long internalId = res.killedNodes.get(i); if (internalId == locNode.internalId()) { fut.onError(new IgniteCheckedException("Local node is forced to stop " + @@ -2555,7 +2613,7 @@ public class ZookeeperDiscoveryImpl { long topVer = msg.topVer; for (int i = 0; i < res.killedNodes.size(); i++) { - int nodeInternalId = res.killedNodes.get(i); + long nodeInternalId = res.killedNodes.get(i); ClusterNode node = rtState.top.nodesByInternalId.get(nodeInternalId); @@ -2579,7 +2637,7 @@ public class ZookeeperDiscoveryImpl { * @param internalIds Nodes internal IDs. * @throws Exception If failed. */ - private void deleteAliveNodes(@Nullable GridIntList internalIds) throws Exception { + private void deleteAliveNodes(@Nullable GridLongList internalIds) throws Exception { if (internalIds == null) return; @@ -2720,7 +2778,7 @@ public class ZookeeperDiscoveryImpl { long topVer = evtsData.topVer; - GridIntList killedNodesList = null; + GridLongList killedNodesList = null; if (err == null) { boolean fullyConnected = true; @@ -2765,7 +2823,7 @@ public class ZookeeperDiscoveryImpl { ", nodeIds=" + U.nodeIds(killedNodes) + ']'); } - killedNodesList = new GridIntList(killedNodes.size()); + killedNodesList = new GridLongList(killedNodes.size()); for (ClusterNode killedNode : killedNodes) { killedNodesList.add(((ZookeeperClusterNode)killedNode).internalId()); @@ -2913,7 +2971,7 @@ public class ZookeeperDiscoveryImpl { * @param nodeInternalOrder Node order. * @param topVer Topology version. */ - private void notifyNodeFail(int nodeInternalOrder, long topVer) { + private void notifyNodeFail(long nodeInternalOrder, long topVer) { final ZookeeperClusterNode failedNode = rtState.top.removeNode(nodeInternalOrder); assert failedNode != null && !failedNode.isLocal() : failedNode; @@ -3574,7 +3632,7 @@ public class ZookeeperDiscoveryImpl { if (data.length > 0) { ZkAliveNodeData nodeData = unmarshalZip(data); - Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); + Long nodeInternalId = ZkIgnitePaths.aliveInternalId(path); Iterator<ZkDiscoveryEventData> it = rtState.evtsData.evts.values().iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/df49a51b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 524e69b..2a4fa76 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -148,7 +148,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private boolean testCommSpi; /** */ - private int sesTimeout; + private long sesTimeout; + + /** */ + private long joinTimeout; /** */ private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); @@ -189,6 +192,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + if (joinTimeout != 0) + zkSpi.setJoinTimeout(joinTimeout); + zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); // Set authenticator for basic sanity tests. @@ -2510,9 +2516,52 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * + */ + public void testStartNoServers_FailOnTimeout() { + joinTimeout = 3000; + + client = true; + + long start = System.currentTimeMillis(); + + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(0); + + return null; + } + }, IgniteCheckedException.class, null); + + assertTrue(System.currentTimeMillis() >= start + joinTimeout); + + IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); + + assertNotNull(spiErr); + assertTrue(spiErr.getMessage().contains("Failed to connect to cluster within configured timeout")); + } + + /** + * @throws Exception If failed. + */ + public void testStartNoServer_WaitForServers1() throws Exception { + startNoServer_WaitForServers(0); + } + + /** * @throws Exception If failed. */ - public void testClientStartNoServers() throws Exception { + public void testStartNoServer_WaitForServers2() throws Exception { + startNoServer_WaitForServers(10_000); + } + + /** + * @param joinTimeout Join timeout. + * @throws Exception If failed. + */ + private void startNoServer_WaitForServers(long joinTimeout) throws Exception { + this.joinTimeout = joinTimeout; + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { client = true; @@ -2525,7 +2574,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { waitSpi(getTestIgniteInstanceName(0)); + startGrid(1); + fut.get(); + + waitForTopology(2); } /**
