http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java new file mode 100644 index 0000000..0c79c36 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * Zk Communication Error Resolve Start Message. + */ +public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID id; + + /** + * @param id Unique ID. + */ + ZkCommunicationErrorResolveStartMessage(UUID id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkCommunicationErrorResolveStartMessage.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java new file mode 100644 index 0000000..d27b717 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CommunicationFailureContext; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +class ZkCommunicationFailureContext implements CommunicationFailureContext { + /** */ + private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() { + @Override public int compare(ClusterNode node1, ClusterNode node2) { + return Long.compare(node1.order(), node2.order()); + } + }; + + /** */ + private Set<ClusterNode> killedNodes = new HashSet<>(); + + /** */ + private final Map<UUID, BitSet> nodesState; + + /** */ + private final List<ClusterNode> initialNodes; + + /** */ + private final List<ClusterNode> curNodes; + + /** */ + private final GridCacheSharedContext<?, ?> ctx; + + /** + * @param ctx Context. + * @param curNodes Current topology snapshot. + * @param initialNodes Topology snapshot when communication error resolve started. + * @param nodesState Nodes communication state. + */ + ZkCommunicationFailureContext( + GridCacheSharedContext<?, ?> ctx, + List<ClusterNode> curNodes, + List<ClusterNode> initialNodes, + Map<UUID, BitSet> nodesState) + { + this.ctx = ctx; + this.curNodes = Collections.unmodifiableList(curNodes); + this.initialNodes = initialNodes; + this.nodesState = nodesState; + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> topologySnapshot() { + return curNodes; + } + + /** {@inheritDoc} */ + @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) { + BitSet nodeState = nodesState.get(node1.id()); + + if (nodeState == null) + throw new IllegalArgumentException("Invalid node: " + node1); + + int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP); + + if (nodeIdx < 0) + throw new IllegalArgumentException("Invalid node: " + node2); + + assert nodeIdx < nodeState.size() : nodeIdx; + + return nodeState.get(nodeIdx); + } + + /** {@inheritDoc} */ + @Override public Map<String, CacheConfiguration<?, ?>> startedCaches() { + Map<Integer, DynamicCacheDescriptor> cachesMap = ctx.affinity().caches(); + + Map<String, CacheConfiguration<?, ?>> res = U.newHashMap(cachesMap.size()); + + for (DynamicCacheDescriptor desc : cachesMap.values()) { + if (desc.cacheType().userCache()) + res.put(desc.cacheName(), desc.cacheConfiguration()); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) { + if (cacheName == null) + throw new NullPointerException("Null cache name."); + + DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName)); + + if (cacheDesc == null) + throw new IllegalArgumentException("Invalid cache name: " + cacheName); + + GridAffinityAssignmentCache aff = ctx.affinity().groupAffinity(cacheDesc.groupId()); + + assert aff != null : cacheName; + + return aff.readyAssignments(aff.lastVersion()); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) { + if (cacheName == null) + throw new NullPointerException("Null cache name."); + + DynamicCacheDescriptor cacheDesc = ctx.affinity().caches().get(CU.cacheId(cacheName)); + + if (cacheDesc == null) + throw new IllegalArgumentException("Invalid cache name: " + cacheName); + + if (cacheDesc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL) + return Collections.emptyList(); + + CacheGroupContext grp = ctx.cache().cacheGroup(cacheDesc.groupId()); + + GridDhtPartitionTopology top; + + if (grp == null) { + top = ctx.exchange().clientTopologyIfExists(cacheDesc.groupId()); + + assert top != null : cacheName; + } + else + top = grp.topology(); + + return top.allOwners(); + } + + /** {@inheritDoc} */ + @Override public void killNode(ClusterNode node) { + if (node == null) + throw new NullPointerException(); + + if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0) + throw new IllegalArgumentException("Invalid node: " + node); + + killedNodes.add(node); + } + + /** + * @return Nodes to fail. + */ + Set<ClusterNode> killedNodes() { + return killedNodes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkCommunicationFailureContext []"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java new file mode 100644 index 0000000..21dfe62 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; + +/** + * + */ +class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final long origEvtId; + + /** */ + final UUID sndNodeId; + + /** */ + final String evtPath; + + /** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */ + DiscoverySpiCustomMessage msg; + + /** Unmarshalled message. */ + transient DiscoverySpiCustomMessage resolvedMsg; + + /** + * @param evtId Event ID. + * @param origEvtId For acknowledge events ID of original event. + * @param topVer Topology version. + * @param sndNodeId Sender node ID. + * @param msg Message instance. + * @param evtPath Event path. + */ + ZkDiscoveryCustomEventData( + long evtId, + long origEvtId, + long topVer, + UUID sndNodeId, + DiscoverySpiCustomMessage msg, + String evtPath) + { + super(evtId, ZK_EVT_CUSTOM_EVT, topVer); + + assert sndNodeId != null; + assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath); + + this.origEvtId = origEvtId; + this.msg = msg; + this.sndNodeId = sndNodeId; + this.evtPath = evtPath; + } + + /** + * @return {@code True} for custom event ack message. + */ + boolean ackEvent() { + return origEvtId != 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkDiscoveryCustomEventData [" + + "evtId=" + eventId() + + ", topVer=" + topologyVersion() + + ", sndNode=" + sndNodeId + + ", ack=" + ackEvent() + + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java new file mode 100644 index 0000000..d667a17 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +abstract class ZkDiscoveryEventData implements Serializable { + /** */ + static final byte ZK_EVT_NODE_JOIN = 1; + + /** */ + static final byte ZK_EVT_NODE_FAILED = 2; + + /** */ + static final byte ZK_EVT_CUSTOM_EVT = 3; + + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long evtId; + + /** */ + private final byte evtType; + + /** */ + private final long topVer; + + /** */ + private transient Set<Long> remainingAcks; + + /** */ + int flags; + + /** + * @param evtId Event ID. + * @param evtType Event type. + * @param topVer Topology version. + */ + ZkDiscoveryEventData(long evtId, byte evtType, long topVer) { + assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_FAILED || evtType == ZK_EVT_CUSTOM_EVT : evtType; + + this.evtId = evtId; + this.evtType = evtType; + this.topVer = topVer; + } + + /** + * @param nodes Current nodes in topology. + */ + void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) { + assert remainingAcks == null : this; + + remainingAcks = U.newHashSet(nodes.size()); + + for (ZookeeperClusterNode node : nodes) { + if (!node.isLocal() && node.order() <= topVer) { + boolean add = remainingAcks.add(node.internalId()); + + assert add : node; + } + } + } + + /** + * @param node Node. + */ + void addRemainingAck(ZookeeperClusterNode node) { + assert node.order() <= topVer : node; + + boolean add = remainingAcks.add(node.internalId()); + + assert add : node; + } + + /** + * @return {@code True} if all nodes processed event. + */ + boolean allAcksReceived() { + return remainingAcks.isEmpty(); + } + + /** + * @return Remaining acks. + */ + Set<Long> remainingAcks() { + return remainingAcks; + } + + /** + * @param nodeInternalId Node ID. + * @param ackEvtId Last event ID processed on node. + * @return {@code True} if all nodes processed event. + */ + boolean onAckReceived(Long nodeInternalId, long ackEvtId) { + assert remainingAcks != null; + + if (ackEvtId >= evtId) + remainingAcks.remove(nodeInternalId); + + return remainingAcks.isEmpty(); + } + + /** + * @param node Failed node. + * @return {@code True} if all nodes processed event. + */ + boolean onNodeFail(ZookeeperClusterNode node) { + assert remainingAcks != null : this; + + remainingAcks.remove(node.internalId()); + + return remainingAcks.isEmpty(); + } + + /** + * @param flag Flag mask. + * @return {@code True} if flag set. + */ + boolean flagSet(int flag) { + return (flags & flag) == flag; + } + + /** + * @return Event ID. + */ + long eventId() { + return evtId; + } + + /** + * @return Event type. + */ + byte eventType() { + return evtType; + } + + /** + * @return Event topology version. + */ + long topologyVersion() { + return topVer; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java new file mode 100644 index 0000000..dce861b --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.Collection; +import java.util.TreeMap; +import java.util.UUID; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkDiscoveryEventsData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Unique cluster ID (generated when first node in cluster starts). */ + final UUID clusterId; + + /** Internal order of last processed custom event. */ + long procCustEvt = -1; + + /** Event ID counter. */ + long evtIdGen; + + /** Current topology version. */ + long topVer; + + /** Max node internal order in cluster. */ + long maxInternalOrder; + + /** Cluster start time (recorded when first node in cluster starts). */ + final long clusterStartTime; + + /** Events to process. */ + final TreeMap<Long, ZkDiscoveryEventData> evts; + + /** ID of current active communication error resolve process. */ + private UUID commErrFutId; + + /** + * @param clusterStartTime Start time of first node in cluster. + * @return Events. + */ + static ZkDiscoveryEventsData createForNewCluster(long clusterStartTime) { + return new ZkDiscoveryEventsData( + UUID.randomUUID(), + clusterStartTime, + 1L, + new TreeMap<Long, ZkDiscoveryEventData>() + ); + } + + /** + * @param clusterId Cluster ID. + * @param topVer Current topology version. + * @param clusterStartTime Cluster start time. + * @param evts Events history. + */ + private ZkDiscoveryEventsData( + UUID clusterId, + long clusterStartTime, + long topVer, + TreeMap<Long, ZkDiscoveryEventData> evts) + { + this.clusterId = clusterId; + this.clusterStartTime = clusterStartTime; + this.topVer = topVer; + this.evts = evts; + } + + /** + * @param node Joined node. + */ + void onNodeJoin(ZookeeperClusterNode node) { + if (node.internalId() > maxInternalOrder) + maxInternalOrder = node.internalId(); + } + + /** + * @return Future ID. + */ + @Nullable UUID communicationErrorResolveFutureId() { + return commErrFutId; + } + + /** + * @param id Future ID. + */ + void communicationErrorResolveFutureId(@Nullable UUID id) { + commErrFutId = id; + } + + /** + * @param nodes Current nodes in topology (these nodes should ack that event processed). + * @param evt Event. + */ + void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) { + Object old = evts.put(evt.eventId(), evt); + + assert old == null : old; + + evt.initRemainingAcks(nodes); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java new file mode 100644 index 0000000..c76158f --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long failedNodeInternalId; + + /** + * @param evtId Event ID. + * @param topVer Topology version. + * @param failedNodeInternalId Failed node ID. + */ + ZkDiscoveryNodeFailEventData(long evtId, long topVer, long failedNodeInternalId) { + super(evtId, ZK_EVT_NODE_FAILED, topVer); + + this.failedNodeInternalId = failedNodeInternalId; + } + + /** + * @return Failed node ID. + */ + long failedNodeInternalId() { + return failedNodeInternalId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkDiscoveryNodeFailEventData [" + + "evtId=" + eventId() + + ", topVer=" + topologyVersion() + + ", nodeId=" + failedNodeInternalId + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java new file mode 100644 index 0000000..e46d52d --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.List; + +/** + * + */ +class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final List<ZkJoinedNodeEvtData> joinedNodes; + + /** */ + final int dataForJoinedPartCnt; + + /** + * @param evtId Event ID. + * @param topVer Topology version. + * @param joinedNodes Joined nodes data. + * @param dataForJoinedPartCnt Data for joined part count. + */ + ZkDiscoveryNodeJoinEventData( + long evtId, + long topVer, + List<ZkJoinedNodeEvtData> joinedNodes, + int dataForJoinedPartCnt) + { + super(evtId, ZK_EVT_NODE_JOIN, topVer); + + this.joinedNodes = joinedNodes; + this.dataForJoinedPartCnt = dataForJoinedPartCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkDiscoveryNodeJoinEventData [" + + "evtId=" + eventId() + + ", topVer=" + topologyVersion() + + ", nodes=" + joinedNodes + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java new file mode 100644 index 0000000..174d698 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +/** + * + */ +class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { + /** */ + private final IgniteLogger log; + + /** */ + private final String futPath; + + /** */ + private final Set<Long> remainingNodes; + + /** */ + private final Callable<Void> lsnr; + + /** + * @param impl Disovery impl + * @param rtState Runtime state. + * @param futPath Future path. + * @param lsnr Future listener. + * @throws Exception If listener call failed. + */ + ZkDistributedCollectDataFuture( + ZookeeperDiscoveryImpl impl, + ZkRuntimeState rtState, + String futPath, + Callable<Void> lsnr) + throws Exception + { + this.log = impl.log(); + this.futPath = futPath; + this.lsnr = lsnr; + + ZkClusterNodes top = rtState.top; + + // Assume new nodes can not join while future is in progress. + + remainingNodes = U.newHashSet(top.nodesByOrder.size()); + + for (ZookeeperClusterNode node : top.nodesByInternalId.values()) + remainingNodes.add(node.order()); + + NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl); + + if (remainingNodes.isEmpty()) + completeAndNotifyListener(); + else { + if (log.isInfoEnabled()) { + log.info("Initialize data collect future [futPath=" + futPath + ", " + + "remainingNodes=" + remainingNodes.size() + ']'); + } + + rtState.zkClient.getChildrenAsync(futPath, watcher, watcher); + } + } + + /** + * @throws Exception If listener call failed. + */ + private void completeAndNotifyListener() throws Exception { + if (super.onDone()) + lsnr.call(); + } + + /** + * @param futPath + * @param client + * @param nodeOrder + * @param data + * @throws Exception If failed. + */ + static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception { + client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT); + } + + /** + * @param futPath + * @param client + * @param nodeOrder + * @return Node result data. + * @throws Exception If fai.ed + */ + static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception { + return client.getData(futPath + "/" + nodeOrder); + } + + /** + * @param futResPath Result path. + * @param client Client. + * @param data Result data. + * @throws Exception If failed. + */ + static void saveResult(String futResPath, ZookeeperClient client, byte[] data) throws Exception { + client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT); + } + + static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception { + return client.getData(paths.distributedFutureResultPath(futId)); + } + + /** + * @param client Client. + * @param paths Paths utils. + * @param futId Future ID. + * @param log Ignite Logger. + * @throws Exception If failed. + */ + static void deleteFutureData(ZookeeperClient client, + ZkIgnitePaths paths, + UUID futId, + IgniteLogger log + ) throws Exception { + // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8189 + String evtDir = paths.distributedFutureBasePath(futId); + + try { + client.deleteAll(evtDir, + client.getChildren(evtDir), + -1); + } + catch (KeeperException.NoNodeException e) { + U.log(log, "Node for deletion was not found: " + e.getPath()); + + // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8189 + } + + client.deleteIfExists(evtDir, -1); + + client.deleteIfExists(paths.distributedFutureResultPath(futId), -1); + } + + /** + * @param top Current topology. + * @throws Exception If listener call failed. + */ + void onTopologyChange(ZkClusterNodes top) throws Exception { + if (remainingNodes.isEmpty()) + return; + + for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) { + Long nodeOrder = it.next(); + + if (!top.nodesByOrder.containsKey(nodeOrder)) { + it.remove(); + + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkDistributedCollectDataFuture removed remaining failed node [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + futPath + ']'); + } + + if (remaining == 0) { + completeAndNotifyListener(); + + break; + } + } + } + } + + /** + * + */ + class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override protected void process0(WatchedEvent evt) { + if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged) + rtState.zkClient.getChildrenAsync(evt.getPath(), this, this); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) + return; + + try { + if (!isDone()) { + assert rc == 0 : KeeperException.Code.get(rc); + + for (int i = 0; i < children.size(); i++) { + Long nodeOrder = Long.parseLong(children.get(i)); + + if (remainingNodes.remove(nodeOrder)) { + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkDistributedCollectDataFuture added new result [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + path + ']'); + } + + if (remaining == 0) + completeAndNotifyListener(); + } + } + } + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java new file mode 100644 index 0000000..de7291c --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * Zk Force Node Fail Message. + */ +public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final long nodeInternalId; + + /** */ + final String warning; + + /** + * @param nodeInternalId Node ID. + * @param warning Warning to be displayed on all nodes. + */ + ZkForceNodeFailMessage(long nodeInternalId, String warning) { + this.nodeInternalId = nodeInternalId; + this.warning = warning; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkForceNodeFailMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java new file mode 100644 index 0000000..9caf00f --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; + +/** + * + */ +class ZkIgnitePaths { + /** */ + static final String PATH_SEPARATOR = "/"; + + /** */ + private static final byte CLIENT_NODE_FLAG_MASK = 0x01; + + /** */ + private static final int UUID_LEN = 36; + + /** Directory to store joined node data. */ + private static final String JOIN_DATA_DIR = "jd"; + + /** Directory to store new custom events. */ + private static final String CUSTOM_EVTS_DIR = "ce"; + + /** Directory to store parts of multi-parts custom events. */ + private static final String CUSTOM_EVTS_PARTS_DIR = "cp"; + + /** Directory to store acknowledge messages for custom events. */ + private static final String CUSTOM_EVTS_ACKS_DIR = "ca"; + + /** Directory to store EPHEMERAL znodes for alive cluster nodes. */ + static final String ALIVE_NODES_DIR = "n"; + + /** Path to store discovery events {@link ZkDiscoveryEventsData}. */ + private static final String DISCO_EVENTS_PATH = "e"; + + /** */ + final String clusterDir; + + /** */ + final String aliveNodesDir; + + /** */ + final String joinDataDir; + + /** */ + final String evtsPath; + + /** */ + final String customEvtsDir; + + /** */ + final String customEvtsPartsDir; + + /** */ + final String customEvtsAcksDir; + + /** + * @param zkRootPath Base Zookeeper directory for all Ignite nodes. + */ + ZkIgnitePaths(String zkRootPath) { + clusterDir = zkRootPath; + + aliveNodesDir = zkPath(ALIVE_NODES_DIR); + joinDataDir = zkPath(JOIN_DATA_DIR); + evtsPath = zkPath(DISCO_EVENTS_PATH); + customEvtsDir = zkPath(CUSTOM_EVTS_DIR); + customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR); + customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR); + } + + /** + * @param path Relative path. + * @return Full path. + */ + private String zkPath(String path) { + return clusterDir + "/" + path; + } + + /** + * @param nodeId Node ID. + * @param prefixId Unique prefix ID. + * @return Path. + */ + String joiningNodeDataPath(UUID nodeId, UUID prefixId) { + return joinDataDir + '/' + prefixId + ":" + nodeId.toString(); + } + + /** + * @param path Alive node zk path. + * @return Node internal ID. + */ + static long aliveInternalId(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + /** + * @param prefix Node unique path prefix. + * @param node Node. + * @return Path. + */ + String aliveNodePathForCreate(String prefix, ZookeeperClusterNode node) { + byte flags = 0; + + if (node.isClient()) + flags |= CLIENT_NODE_FLAG_MASK; + + return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + encodeFlags(flags) + "|"; + } + + /** + * @param path Alive node zk path. + * @return {@code True} if node is client. + */ + static boolean aliveNodeClientFlag(String path) { + return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0; + } + + /** + * @param path Alive node zk path. + * @return Node ID. + */ + static UUID aliveNodePrefixId(String path) { + return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN)); + } + + /** + * @param path Alive node zk path. + * @return Node ID. + */ + static UUID aliveNodeId(String path) { + // <uuid prefix>:<node id>:<flags>|<alive seq> + int startIdx = ZkIgnitePaths.UUID_LEN + 1; + + String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); + + return UUID.fromString(idStr); + } + + /** + * @param path Event zk path. + * @return Event sequence number. + */ + static int customEventSequence(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + /** + * @param path Custom event zl path. + * @return Event node ID. + */ + static UUID customEventSendNodeId(String path) { + // <uuid prefix>:<node id>:<partCnt>|<seq> + int startIdx = ZkIgnitePaths.UUID_LEN + 1; + + String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); + + return UUID.fromString(idStr); + } + + /** + * @param path Event path. + * @return Event unique prefix. + */ + static String customEventPrefix(String path) { + // <uuid prefix>:<node id>:<partCnt>|<seq> + + return path.substring(0, ZkIgnitePaths.UUID_LEN); + } + + /** + * @param path Custom event zl path. + * @return Event node ID. + */ + static int customEventPartsCount(String path) { + // <uuid prefix>:<node id>:<partCnt>|<seq> + int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2; + + String cntStr = path.substring(startIdx, startIdx + 4); + + int partCnt = Integer.parseInt(cntStr); + + assert partCnt >= 1 : partCnt; + + return partCnt; + } + + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @param partCnt Parts count. + * @return Path. + */ + String createCustomEventPath(String prefix, UUID nodeId, int partCnt) { + return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|'; + } + + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @return Path. + */ + String customEventPartsBasePath(String prefix, UUID nodeId) { + return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":"; + } + + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @param part Part number. + * @return Path. + */ + String customEventPartPath(String prefix, UUID nodeId, int part) { + return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part); + } + + /** + * @param evtId Event ID. + * @return Event zk path. + */ + String joinEventDataPathForJoined(long evtId) { + return evtsPath + "/fj-" + evtId; + } + + /** + * @param topVer Event topology version. + * @return Event zk path. + */ + String joinEventSecuritySubjectPath(long topVer) { + return evtsPath + "/s-" + topVer; + } + + /** + * @param origEvtId ID of original custom event. + * @return Path for custom event ack. + */ + String ackEventDataPath(long origEvtId) { + assert origEvtId != 0; + + return customEvtsAcksDir + "/" + String.valueOf(origEvtId); + } + + /** + * @param id Future ID. + * @return Future path. + */ + String distributedFutureBasePath(UUID id) { + return evtsPath + "/f-" + id; + } + + /** + * @param id Future ID. + * @return Future path. + */ + String distributedFutureResultPath(UUID id) { + return evtsPath + "/fr-" + id; + } + + /** + * @param flags Flags. + * @return Flags string. + */ + private static String encodeFlags(byte flags) { + int intVal = flags + 128; + + String str = Integer.toString(intVal, 16); + + if (str.length() == 1) + str = '0' + str; + + assert str.length() == 2 : str; + + return str; + } + + /** + * @param path Alive node zk path. + * @return Flags. + */ + private static byte aliveFlags(String path) { + int startIdx = path.lastIndexOf(':') + 1; + + String flagsStr = path.substring(startIdx, startIdx + 2); + + return (byte)(Integer.parseInt(flagsStr, 16) - 128); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java new file mode 100644 index 0000000..a73312c --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZkInternalJoinErrorMessage implements ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + transient boolean notifyNode = true; + + /** */ + final long nodeInternalId; + + /** */ + final String err; + + /** + * @param nodeInternalId Joining node internal ID. + * @param err Error message. + */ + ZkInternalJoinErrorMessage(long nodeInternalId, String err) { + this.nodeInternalId = nodeInternalId; + this.err = err; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java new file mode 100644 index 0000000..c1d56f0 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; + +/** + * + */ +interface ZkInternalMessage extends Serializable { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java new file mode 100644 index 0000000..e4ae4ba0 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkJoinEventDataForJoined implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final List<ZookeeperClusterNode> top; + + /** */ + private final Map<Long, byte[]> discoData; + + /** */ + private final Map<Long, Long> dupDiscoData; + + /** + * @param top Topology. + * @param discoData Discovery data. + */ + ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> discoData, @Nullable Map<Long, Long> dupDiscoData) { + assert top != null; + assert discoData != null && !discoData.isEmpty(); + + this.top = top; + this.discoData = discoData; + this.dupDiscoData = dupDiscoData; + } + + byte[] discoveryDataForNode(long nodeOrder) { + assert discoData != null; + + byte[] dataBytes = discoData.get(nodeOrder); + + if (dataBytes != null) + return dataBytes; + + assert dupDiscoData != null; + + Long dupDataNode = dupDiscoData.get(nodeOrder); + + assert dupDataNode != null; + + dataBytes = discoData.get(dupDataNode); + + assert dataBytes != null; + + return dataBytes; + } + + /** + * @return Current topology. + */ + List<ZookeeperClusterNode> topology() { + assert top != null; + + return top; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java new file mode 100644 index 0000000..3c367cf --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Zk Joined Node Evt Data. + */ +public class ZkJoinedNodeEvtData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final long topVer; + + /** */ + final long joinedInternalId; + + /** */ + final UUID nodeId; + + /** */ + final int joinDataPartCnt; + + /** */ + final int secSubjPartCnt; + + /** */ + final UUID joinDataPrefixId; + + /** */ + transient ZkJoiningNodeData joiningNodeData; + + /** + * @param topVer Topology version for node join event. + * @param nodeId Joined node ID. + * @param joinedInternalId Joined node internal ID. + * @param joinDataPrefixId Join data unique prefix. + * @param joinDataPartCnt Join data part count. + * @param secSubjPartCnt Security subject part count. + */ + ZkJoinedNodeEvtData( + long topVer, + UUID nodeId, + long joinedInternalId, + UUID joinDataPrefixId, + int joinDataPartCnt, + int secSubjPartCnt) + { + this.topVer = topVer; + this.nodeId = nodeId; + this.joinedInternalId = joinedInternalId; + this.joinDataPrefixId = joinDataPrefixId; + this.joinDataPartCnt = joinDataPartCnt; + this.secSubjPartCnt = secSubjPartCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java new file mode 100644 index 0000000..ff8311d --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +class ZkJoiningNodeData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int partCnt; + + /** */ + @GridToStringInclude + private ZookeeperClusterNode node; + + /** */ + @GridToStringInclude + private Map<Integer, Serializable> discoData; + + /** + * @param partCnt Number of parts in multi-parts message. + */ + ZkJoiningNodeData(int partCnt) { + this.partCnt = partCnt; + } + + /** + * @param node Node. + * @param discoData Discovery data. + */ + ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) { + assert node != null && node.id() != null : node; + assert discoData != null; + + this.node = node; + this.discoData = discoData; + } + + /** + * @return Number of parts in multi-parts message. + */ + int partCount() { + return partCnt; + } + + /** + * @return Node. + */ + ZookeeperClusterNode node() { + return node; + } + + /** + * @return Discovery data. + */ + Map<Integer, Serializable> discoveryData() { + return discoData; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkJoiningNodeData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java new file mode 100644 index 0000000..626fe74 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkNoServersMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java new file mode 100644 index 0000000..2abfee3 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZkNodeValidateResult { + /** */ + String err; + + /** */ + byte[] secSubjZipBytes; + + /** + * @param err Error. + */ + ZkNodeValidateResult(String err) { + this.err = err; + } + + /** + * @param secSubjZipBytes Marshalled security subject. + */ + ZkNodeValidateResult(byte[] secSubjZipBytes) { + this.secSubjZipBytes = secSubjZipBytes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java new file mode 100644 index 0000000..965bdc0 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * Zk Runnable. + */ +public abstract class ZkRunnable extends ZkAbstractCallabck implements Runnable { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public void run() { + if (!onProcessStart()) + return; + + try { + run0(); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** + * + */ + protected abstract void run0() throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java new file mode 100644 index 0000000..cb04ac3 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.Watcher; + +/** + * + */ +class ZkRuntimeState { + /** */ + ZkWatcher watcher; + + /** */ + ZkAliveNodeDataWatcher aliveNodeDataWatcher; + + /** */ + volatile Exception errForClose; + + /** */ + final boolean prevJoined; + + /** */ + ZookeeperClient zkClient; + + /** */ + long internalOrder; + + /** */ + int joinDataPartCnt; + + /** */ + long gridStartTime; + + /** */ + volatile boolean joined; + + /** */ + ZkDiscoveryEventsData evtsData; + + /** */ + boolean crd; + + /** */ + String locNodeZkPath; + + /** */ + final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + + /** */ + int procEvtCnt; + + /** */ + final ZkClusterNodes top = new ZkClusterNodes(); + + /** */ + List<ClusterNode> commErrProcNodes; + + /** Timeout callback registering watcher for join error + * (set this watcher after timeout as a minor optimization). + */ + ZkTimeoutObject joinErrTo; + + /** Timeout callback set to wait for join timeout. */ + ZkTimeoutObject joinTo; + + /** Timeout callback to update processed events counter. */ + ZkTimeoutObject procEvtsUpdateTo; + + /** */ + boolean updateAlives; + + /** + * @param prevJoined {@code True} if joined topology before reconnect attempt. + */ + ZkRuntimeState(boolean prevJoined) { + this.prevJoined = prevJoined; + } + + /** + * @param watcher Watcher. + * @param aliveNodeDataWatcher Alive nodes data watcher. + */ + void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) { + this.watcher = watcher; + this.aliveNodeDataWatcher = aliveNodeDataWatcher; + } + + /** + * @param err Error. + */ + void onCloseStart(Exception err) { + assert err != null; + + errForClose = err; + + ZookeeperClient zkClient = this.zkClient; + + if (zkClient != null) + zkClient.onCloseStart(); + } + + /** + * + */ + interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, AsyncCallback.DataCallback { + // No-op. + } + + /** + * + */ + interface ZkAliveNodeDataWatcher extends Watcher, AsyncCallback.DataCallback { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java new file mode 100644 index 0000000..4d3d5b4 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; + +/** + * + */ +abstract class ZkTimeoutObject implements IgniteSpiTimeoutObject { + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private final long endTime; + + /** */ + volatile boolean cancelled; + + /** + * @param timeout Timeout. + */ + ZkTimeoutObject(long timeout) { + long endTime = timeout >= 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; + + this.endTime = endTime >= 0 ? endTime : Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override public final IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public final long endTime() { + return endTime; + } +}
