# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b458bd09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b458bd09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b458bd09 Branch: refs/heads/ignite-63 Commit: b458bd090a23639915f0fe8861c50d763f7cc80f Parents: c602549 Author: sboikov <[email protected]> Authored: Fri Jan 23 00:08:40 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Jan 23 00:08:44 2015 +0300 ---------------------------------------------------------------------- ...GridClientAbstractMultiThreadedSelfTest.java | 2 +- .../GridClientPartitionAffinitySelfTest.java | 2 +- .../internal/GridEventConsumeHandler.java | 3 +- .../org/apache/ignite/internal/GridKernal.java | 8 +- .../ignite/internal/GridKernalContext.java | 8 +- .../ignite/internal/GridKernalContextImpl.java | 8 +- .../internal/GridMessageListenHandler.java | 2 +- .../ignite/internal/IgniteMessagingImpl.java | 2 +- .../affinity/GridAffinityAssignment.java | 168 ++ .../affinity/GridAffinityAssignmentCache.java | 409 ++++ .../affinity/GridAffinityMessage.java | 164 ++ .../affinity/GridAffinityProcessor.java | 528 +++++ .../processors/affinity/GridAffinityUtils.java | 187 ++ .../GridCacheAffinityFunctionContextImpl.java | 83 + .../internal/processors/affinity/package.html | 23 + .../processors/closure/GridClosurePolicy.java | 51 + .../closure/GridClosureProcessor.java | 1744 +++++++++++++++++ .../GridMasterLeaveAwareComputeJobAdapter.java | 36 + .../closure/GridPeerDeployAwareTaskAdapter.java | 60 + .../internal/processors/closure/package.html | 23 + .../continuous/GridContinuousHandler.java | 105 + .../continuous/GridContinuousMessage.java | 256 +++ .../continuous/GridContinuousMessageType.java | 56 + .../continuous/GridContinuousProcessor.java | 1846 ++++++++++++++++++ .../dataload/GridDataLoadCacheUpdaters.java | 292 +++ .../dataload/GridDataLoadRequest.java | 548 ++++++ .../dataload/GridDataLoadResponse.java | 181 ++ .../dataload/GridDataLoadUpdateJob.java | 120 ++ .../dataload/GridDataLoaderFuture.java | 75 + .../dataload/GridDataLoaderProcessor.java | 318 +++ .../dataload/IgniteDataLoaderImpl.java | 1346 +++++++++++++ .../internal/processors/dataload/package.html | 23 + .../processors/fs/GridGgfsDataManager.java | 2 +- .../processors/fs/GridGgfsIpcHandler.java | 2 +- .../task/GridStreamerBroadcastTask.java | 2 +- .../streamer/task/GridStreamerQueryTask.java | 2 +- .../streamer/task/GridStreamerReduceTask.java | 2 +- .../GridTcpCommunicationMessageFactory.java | 4 +- .../affinity/GridAffinityAssignment.java | 168 -- .../affinity/GridAffinityAssignmentCache.java | 409 ---- .../affinity/GridAffinityMessage.java | 164 -- .../affinity/GridAffinityProcessor.java | 528 ----- .../processors/affinity/GridAffinityUtils.java | 187 -- .../GridCacheAffinityFunctionContextImpl.java | 83 - .../kernal/processors/affinity/package.html | 23 - .../cache/GridCacheAffinityManager.java | 2 +- .../processors/cache/GridCacheContext.java | 2 +- .../GridCacheContinuousQueryAdapter.java | 2 +- .../GridCacheContinuousQueryHandler.java | 2 +- .../processors/closure/GridClosurePolicy.java | 51 - .../closure/GridClosureProcessor.java | 1744 ----------------- .../GridMasterLeaveAwareComputeJobAdapter.java | 36 - .../closure/GridPeerDeployAwareTaskAdapter.java | 60 - .../grid/kernal/processors/closure/package.html | 23 - .../continuous/GridContinuousHandler.java | 105 - .../continuous/GridContinuousMessage.java | 256 --- .../continuous/GridContinuousMessageType.java | 56 - .../continuous/GridContinuousProcessor.java | 1846 ------------------ .../dataload/GridDataLoadCacheUpdaters.java | 292 --- .../dataload/GridDataLoadRequest.java | 548 ------ .../dataload/GridDataLoadResponse.java | 181 -- .../dataload/GridDataLoadUpdateJob.java | 120 -- .../dataload/GridDataLoaderFuture.java | 75 - .../dataload/GridDataLoaderProcessor.java | 318 --- .../dataload/IgniteDataLoaderImpl.java | 1346 ------------- .../kernal/processors/dataload/package.html | 23 - .../GridAffinityProcessorAbstractSelfTest.java | 194 ++ ...AffinityProcessorConsistentHashSelfTest.java | 31 + ...GridAffinityProcessorRendezvousSelfTest.java | 31 + .../IgniteCacheEntryListenerAbstractTest.java | 2 +- .../closure/GridClosureProcessorRemoteTest.java | 119 ++ .../closure/GridClosureProcessorSelfTest.java | 541 +++++ .../internal/processors/closure/package.html | 23 + .../continuous/GridEventConsumeSelfTest.java | 1079 ++++++++++ .../continuous/GridMessageListenSelfTest.java | 489 +++++ .../dataload/GridDataLoaderImplSelfTest.java | 215 ++ .../dataload/GridDataLoaderPerformanceTest.java | 215 ++ .../GridDataLoaderProcessorSelfTest.java | 883 +++++++++ .../GridServiceProcessorAbstractSelfTest.java | 2 +- .../util/future/GridFutureAdapterSelfTest.java | 2 +- .../GridCachePartitionFairAffinitySelfTest.java | 2 +- .../GridAffinityProcessorAbstractSelfTest.java | 194 -- ...AffinityProcessorConsistentHashSelfTest.java | 31 - ...GridAffinityProcessorRendezvousSelfTest.java | 31 - .../cache/GridCacheAffinityApiSelfTest.java | 2 +- ...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 2 +- .../closure/GridClosureProcessorRemoteTest.java | 119 -- .../closure/GridClosureProcessorSelfTest.java | 541 ----- .../grid/kernal/processors/closure/package.html | 23 - .../continuous/GridEventConsumeSelfTest.java | 1079 ---------- .../continuous/GridMessageListenSelfTest.java | 489 ----- .../dataload/GridDataLoaderImplSelfTest.java | 215 -- .../dataload/GridDataLoaderPerformanceTest.java | 215 -- .../GridDataLoaderProcessorSelfTest.java | 883 --------- .../testsuites/bamboo/GridBasicTestSuite.java | 6 +- .../bamboo/GridDataGridTestSuite.java | 2 +- 97 files changed, 12501 insertions(+), 12502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java index a9c1ec0..6ce379d 100644 --- a/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java @@ -29,7 +29,7 @@ import org.apache.ignite.resources.*; import org.apache.ignite.client.balancer.*; import org.apache.ignite.client.impl.*; import org.apache.ignite.client.ssl.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java index de8de76..3f3b624 100644 --- a/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.consistenthash.*; import org.apache.ignite.cluster.*; import org.apache.ignite.client.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.typedef.*; import org.gridgain.testframework.*; import org.gridgain.testframework.junits.common.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 9aaddc5..4c224f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -20,13 +20,12 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java index 5c923bd..f6eb037 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java @@ -47,12 +47,12 @@ import org.apache.ignite.internal.managers.loadbalancer.*; import org.apache.ignite.internal.managers.securesession.*; import org.apache.ignite.internal.managers.security.*; import org.apache.ignite.internal.managers.swapspace.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.processors.clock.*; -import org.gridgain.grid.kernal.processors.closure.*; -import org.gridgain.grid.kernal.processors.continuous.*; -import org.gridgain.grid.kernal.processors.dataload.*; +import org.apache.ignite.internal.processors.closure.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; import org.apache.ignite.internal.processors.interop.*; import org.apache.ignite.internal.processors.job.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 132497e..e93bbd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -33,12 +33,12 @@ import org.apache.ignite.internal.managers.indexing.*; import org.apache.ignite.internal.managers.loadbalancer.*; import org.apache.ignite.internal.managers.securesession.*; import org.apache.ignite.internal.managers.swapspace.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.processors.clock.*; -import org.gridgain.grid.kernal.processors.closure.*; -import org.gridgain.grid.kernal.processors.continuous.*; -import org.gridgain.grid.kernal.processors.dataload.*; +import org.apache.ignite.internal.processors.closure.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.interop.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 26c69b9..7a6e9fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -33,14 +33,14 @@ import org.apache.ignite.internal.managers.loadbalancer.*; import org.apache.ignite.internal.managers.securesession.*; import org.apache.ignite.internal.managers.security.*; import org.apache.ignite.internal.managers.swapspace.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.dr.*; import org.gridgain.grid.kernal.processors.cache.dr.os.*; import org.apache.ignite.internal.processors.clock.*; -import org.gridgain.grid.kernal.processors.closure.*; -import org.gridgain.grid.kernal.processors.continuous.*; -import org.gridgain.grid.kernal.processors.dataload.*; +import org.apache.ignite.internal.processors.closure.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.email.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.interop.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index abff829..9aca42c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.apache.ignite.internal.managers.deployment.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index d9bdc3b..fe82ac5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java new file mode 100644 index 0000000..580f64c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cluster.*; + +import java.io.*; +import java.util.*; + +/** + * Cached affinity calculations. + */ +class GridAffinityAssignment implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Topology version. */ + private final long topVer; + + /** Collection of calculated affinity nodes. */ + private List<List<ClusterNode>> assignment; + + /** Map of primary node partitions. */ + private final Map<UUID, Set<Integer>> primary; + + /** Map of backup node partitions. */ + private final Map<UUID, Set<Integer>> backup; + + /** + * Constructs cached affinity calculations item. + * + * @param topVer Topology version. + */ + GridAffinityAssignment(long topVer) { + this.topVer = topVer; + primary = new HashMap<>(); + backup = new HashMap<>(); + } + + /** + * @param topVer Topology version. + * @param assignment Assignment. + */ + GridAffinityAssignment(long topVer, List<List<ClusterNode>> assignment) { + this.topVer = topVer; + this.assignment = assignment; + + primary = new HashMap<>(); + backup = new HashMap<>(); + + initPrimaryBackupMaps(); + } + + /** + * @return Affinity assignment. + */ + public List<List<ClusterNode>> assignment() { + return assignment; + } + + /** + * @return Topology version. + */ + public long topologyVersion() { + return topVer; + } + + /** + * Get affinity nodes for partition. + * + * @param part Partition. + * @return Affinity nodes. + */ + public List<ClusterNode> get(int part) { + assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + + " [part=" + part + ", partitions=" + assignment.size() + ']'; + + return assignment.get(part); + } + + /** + * Get primary partitions for specified node ID. + * + * @param nodeId Node ID to get primary partitions for. + * @return Primary partitions for specified node ID. + */ + public Set<Integer> primaryPartitions(UUID nodeId) { + Set<Integer> set = primary.get(nodeId); + + return set == null ? Collections.<Integer>emptySet() : Collections.unmodifiableSet(set); + } + + /** + * Get backup partitions for specified node ID. + * + * @param nodeId Node ID to get backup partitions for. + * @return Backup partitions for specified node ID. + */ + public Set<Integer> backupPartitions(UUID nodeId) { + Set<Integer> set = backup.get(nodeId); + + return set == null ? Collections.<Integer>emptySet() : Collections.unmodifiableSet(set); + } + + /** + * Initializes primary and backup maps. + */ + private void initPrimaryBackupMaps() { + // Temporary mirrors with modifiable partition's collections. + Map<UUID, Set<Integer>> tmpPrm = new HashMap<>(); + Map<UUID, Set<Integer>> tmpBkp = new HashMap<>(); + + for (int partsCnt = assignment.size(), p = 0; p < partsCnt; p++) { + // Use the first node as primary, other - backups. + Map<UUID, Set<Integer>> tmp = tmpPrm; + Map<UUID, Set<Integer>> map = primary; + + for (ClusterNode node : assignment.get(p)) { + UUID id = node.id(); + + Set<Integer> set = tmp.get(id); + + if (set == null) { + tmp.put(id, set = new HashSet<>()); + map.put(id, Collections.unmodifiableSet(set)); + } + + set.add(p); + + // Use the first node as primary, other - backups. + tmp = tmpBkp; + map = backup; + } + } + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(topVer ^ (topVer >>> 32)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + return topVer == ((GridAffinityAssignment)o).topVer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java new file mode 100644 index 0000000..4730512 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.portables.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Affinity cached function. + */ +public class GridAffinityAssignmentCache { + /** Node order comparator. */ + private static final Comparator<ClusterNode> nodeCmp = new GridNodeOrderComparator(); + + /** Cache name. */ + private final String cacheName; + + /** Number of backups. */ + private int backups; + + /** Affinity function. */ + private final GridCacheAffinityFunction aff; + + /** Partitions count. */ + private final int partsCnt; + + /** Affinity mapper function. */ + private final GridCacheAffinityKeyMapper affMapper; + + /** Affinity calculation results cache: topology version => partition => nodes. */ + private final ConcurrentMap<Long, GridAffinityAssignment> affCache; + + /** Cache item corresponding to the head topology version. */ + private final AtomicReference<GridAffinityAssignment> head; + + /** Discovery manager. */ + private final GridCacheContext ctx; + + /** Ready futures. */ + private final ConcurrentMap<Long, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); + + /** Log. */ + private IgniteLogger log; + + /** + * Constructs affinity cached calculations. + * + * @param ctx Kernal context. + * @param cacheName Cache name. + * @param aff Affinity function. + * @param affMapper Affinity key mapper. + */ + @SuppressWarnings("unchecked") + public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, GridCacheAffinityFunction aff, + GridCacheAffinityKeyMapper affMapper, int backups) { + this.ctx = ctx; + this.aff = aff; + this.affMapper = affMapper; + this.cacheName = cacheName; + this.backups = backups; + + log = ctx.logger(GridAffinityAssignmentCache.class); + + partsCnt = aff.partitions(); + affCache = new ConcurrentLinkedHashMap<>(); + head = new AtomicReference<>(new GridAffinityAssignment(-1)); + } + + /** + * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes + * and brought to local node on partition map exchange. + * + * @param topVer Topology version. + * @param affAssignment Affinity assignment for topology version. + */ + public void initialize(long topVer, List<List<ClusterNode>> affAssignment) { + GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment); + + affCache.put(topVer, assignment); + head.set(assignment); + + for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey() >= topVer) + entry.getValue().onDone(topVer); + } + } + + /** + * Calculates affinity cache for given topology version. + * + * @param topVer Topology version to calculate affinity cache for. + * @param discoEvt Discovery event that caused this topology version change. + */ + public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent discoEvt) { + if (log.isDebugEnabled()) + log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + + ", discoEvt=" + discoEvt + ']'); + + GridAffinityAssignment prev = affCache.get(topVer - 1); + + List<ClusterNode> sorted; + + if (ctx.isLocal()) + // For local cache always use local node. + sorted = Collections.singletonList(ctx.localNode()); + else { + // Resolve nodes snapshot for specified topology version. + Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); + + sorted = sort(nodes); + } + + List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment(); + + List<List<ClusterNode>> assignment = aff.assignPartitions( + new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups)); + + GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment); + + updated = F.addIfAbsent(affCache, topVer, updated); + + // Update top version, if required. + while (true) { + GridAffinityAssignment headItem = head.get(); + + if (headItem.topologyVersion() >= topVer) + break; + + if (head.compareAndSet(headItem, updated)) + break; + } + + for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey() <= topVer) { + if (log.isDebugEnabled()) + log.debug("Completing topology ready future (calculated affinity) [locNodeId=" + ctx.localNodeId() + + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); + + entry.getValue().onDone(topVer); + } + } + + return updated.assignment(); + } + + /** + * @return Last calculated affinity version. + */ + public long lastVersion() { + return head.get().topologyVersion(); + } + + /** + * Clean up outdated cache items. + * + * @param topVer Actual topology version, older versions will be removed. + */ + public void cleanUpCache(long topVer) { + if (log.isDebugEnabled()) + log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() + + ", topVer=" + topVer + ']'); + + for (Iterator<Long> it = affCache.keySet().iterator(); it.hasNext(); ) + if (it.next() < topVer) + it.remove(); + } + + /** + * @param topVer Topology version. + * @return Affinity assignment. + */ + public List<List<ClusterNode>> assignments(long topVer) { + GridAffinityAssignment aff = cachedAffinity(topVer); + + return aff.assignment(); + } + + /** + * Gets future that will be completed after topology with version {@code topVer} is calculated. + * + * @param topVer Topology version to await for. + * @return Future that will be completed after affinity for topology version {@code topVer} is calculated. + */ + public IgniteFuture<Long> readyFuture(long topVer) { + GridAffinityAssignment aff = head.get(); + + if (aff.topologyVersion() >= topVer) { + if (log.isDebugEnabled()) + log.debug("Returning finished future for readyFuture [head=" + aff.topologyVersion() + + ", topVer=" + topVer + ']'); + + return null; + } + + GridFutureAdapter<Long> fut = F.addIfAbsent(readyFuts, topVer, + new AffinityReadyFuture(ctx.kernalContext())); + + aff = head.get(); + + if (aff.topologyVersion() >= topVer) { + if (log.isDebugEnabled()) + log.debug("Completing topology ready future right away [head=" + aff.topologyVersion() + + ", topVer=" + topVer + ']'); + + fut.onDone(topVer); + } + + return fut; + } + + /** + * @return Partition count. + */ + public int partitions() { + return partsCnt; + } + + /** + * NOTE: Use this method always when you need to calculate partition id for + * a key provided by user. It's required since we should apply affinity mapper + * logic in order to find a key that will eventually be passed to affinity function. + * + * @param key Key. + * @return Partition. + */ + public int partition(Object key) { + if (ctx.portableEnabled()) { + try { + key = ctx.marshalToPortable(key); + } + catch (PortableException e) { + U.error(log, "Failed to marshal key to portable: " + key, e); + } + } + + return aff.partition(affMapper.affinityKey(key)); + } + + /** + * Gets affinity nodes for specified partition. + * + * @param part Partition. + * @param topVer Topology version. + * @return Affinity nodes. + */ + public List<ClusterNode> nodes(int part, long topVer) { + // Resolve cached affinity nodes. + return cachedAffinity(topVer).get(part); + } + + /** + * Get primary partitions for specified node ID. + * + * @param nodeId Node ID to get primary partitions for. + * @param topVer Topology version. + * @return Primary partitions for specified node ID. + */ + public Set<Integer> primaryPartitions(UUID nodeId, long topVer) { + return cachedAffinity(topVer).primaryPartitions(nodeId); + } + + /** + * Get backup partitions for specified node ID. + * + * @param nodeId Node ID to get backup partitions for. + * @param topVer Topology version. + * @return Backup partitions for specified node ID. + */ + public Set<Integer> backupPartitions(UUID nodeId, long topVer) { + return cachedAffinity(topVer).backupPartitions(nodeId); + } + + /** + * Get cached affinity for specified topology version. + * + * @param topVer Topology version. + * @return Cached affinity. + */ + private GridAffinityAssignment cachedAffinity(long topVer) { + if (topVer == -1) + topVer = lastVersion(); + else + awaitTopologyVersion(topVer); + + assert topVer >= 0 : topVer; + + GridAffinityAssignment cache = head.get(); + + if (cache.topologyVersion() != topVer) { + cache = affCache.get(topVer); + + if (cache == null) { + throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + + "calculated [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + + ", head=" + head.get().topologyVersion() + ']'); + } + } + + assert cache.topologyVersion() == topVer : "Invalid cached affinity: " + cache; + + return cache; + } + + /** + * @param topVer Topology version to wait. + */ + private void awaitTopologyVersion(long topVer) { + GridAffinityAssignment aff = head.get(); + + if (aff.topologyVersion() >= topVer) + return; + + try { + if (log.isDebugEnabled()) + log.debug("Will wait for topology version [locNodeId=" + ctx.localNodeId() + + ", topVer=" + topVer + ']'); + + IgniteFuture<Long> fut = readyFuture(topVer); + + if (fut != null) + fut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for affinity ready future for topology version: " + topVer, + e); + } + } + + /** + * Sorts nodes according to order. + * + * @param nodes Nodes to sort. + * @return Sorted list of nodes. + */ + private List<ClusterNode> sort(Collection<ClusterNode> nodes) { + List<ClusterNode> sorted = new ArrayList<>(nodes.size()); + + sorted.addAll(nodes); + + Collections.sort(sorted, nodeCmp); + + return sorted; + } + + /** + * Affinity ready future. Will remove itself from ready futures map. + */ + private class AffinityReadyFuture extends GridFutureAdapter<Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public AffinityReadyFuture() { + // No-op. + } + + /** + * @param ctx Kernal context. + */ + private AffinityReadyFuture(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(Long res, @Nullable Throwable err) { + assert res != null; + + boolean done = super.onDone(res, err); + + if (done) + readyFuts.remove(res, this); + + return done; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java new file mode 100644 index 0000000..1e67237 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.io.*; +import java.util.*; + +/** + * Object wrapper containing serialized byte array of original object and deployment information. + */ +class GridAffinityMessage implements Externalizable, IgniteOptimizedMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + private static Object GG_CLASS_ID; + + /** */ + private byte[] src; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private IgniteDeploymentMode depMode; + + /** */ + private String srcClsName; + + /** */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + private Map<UUID, IgniteUuid> ldrParties; + + /** + * @param src Source object. + * @param srcClsName Source object class name. + * @param clsLdrId Class loader ID. + * @param depMode Deployment mode. + * @param userVer User version. + * @param ldrParties Node loader participant map. + */ + GridAffinityMessage( + byte[] src, + String srcClsName, + IgniteUuid clsLdrId, + IgniteDeploymentMode depMode, + String userVer, + Map<UUID, IgniteUuid> ldrParties) { + this.src = src; + this.srcClsName = srcClsName; + this.depMode = depMode; + this.clsLdrId = clsLdrId; + this.userVer = userVer; + this.ldrParties = ldrParties; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridAffinityMessage() { + // No-op. + } + + /** + * @return Source object. + */ + public byte[] source() { + return src; + } + + /** + * @return the Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return Deployment mode. + */ + public IgniteDeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Source message class name. + */ + public String sourceClassName() { + return srcClsName; + } + + /** + * @return User version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Node class loader participant map. + */ + public Map<UUID, IgniteUuid> loaderParticipants() { + return ldrParties != null ? Collections.unmodifiableMap(ldrParties) : null; + } + + /** {@inheritDoc} */ + @Override public Object ggClassId() { + return GG_CLASS_ID; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, src); + + out.writeInt(depMode.ordinal()); + + U.writeGridUuid(out, clsLdrId); + U.writeString(out, srcClsName); + U.writeString(out, userVer); + U.writeMap(out, ldrParties); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + src = U.readByteArray(in); + + depMode = IgniteDeploymentMode.fromOrdinal(in.readInt()); + + clsLdrId = U.readGridUuid(in); + srcClsName = U.readString(in); + userVer = U.readString(in); + ldrParties = U.readMap(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridAffinityMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java new file mode 100644 index 0000000..4d7a466 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.affinity.GridAffinityUtils.*; + +/** + * Data affinity processor. + */ +public class GridAffinityProcessor extends GridProcessorAdapter { + /** Affinity map cleanup delay (ms). */ + private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000; + + /** Retries to get affinity in case of error. */ + private static final int ERROR_RETRIES = 3; + + /** Time to wait between errors (in milliseconds). */ + private static final long ERROR_WAIT = 500; + + /** Null cache name. */ + private static final String NULL_NAME = U.id8(UUID.randomUUID()); + + /** Affinity map. */ + private final ConcurrentMap<AffinityAssignmentKey, IgniteFuture<AffinityInfo>> affMap = new ConcurrentHashMap8<>(); + + /** Listener. */ + private final GridLocalEventListener lsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + int evtType = evt.type(); + + assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_JOINED; + + if (affMap.isEmpty()) + return; // Skip empty affinity map. + + final IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + // Clean up affinity functions if such cache no more exists. + if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) { + final Collection<String> caches = new HashSet<>(); + + for (ClusterNode clusterNode : ctx.discovery().allNodes()) + caches.addAll(U.cacheNames(clusterNode)); + + final Collection<AffinityAssignmentKey> rmv = new GridLeanSet<>(); + + for (AffinityAssignmentKey key : affMap.keySet()) { + if (!caches.contains(key.cacheName) || key.topVer < discoEvt.topologyVersion() - 1) + rmv.add(key); + } + + ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter( + IgniteUuid.fromUuid(ctx.localNodeId()), AFFINITY_MAP_CLEAN_UP_DELAY) { + @Override public void onTimeout() { + affMap.keySet().removeAll(rmv); + } + }); + } + } + }; + + /** + * @param ctx Context. + */ + public GridAffinityProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (ctx != null && ctx.event() != null) + ctx.event().removeLocalEventListener(lsnr); + } + + /** + * Maps keys to nodes for given cache. + * + * @param cacheName Cache name. + * @param keys Keys to map. + * @return Map of nodes to keys. + * @throws IgniteCheckedException If failed. + */ + public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, + @Nullable Collection<? extends K> keys) throws IgniteCheckedException { + return keysToNodes(cacheName, keys); + } + + /** + * Maps keys to nodes on default cache. + * + * @param keys Keys to map. + * @return Map of nodes to keys. + * @throws IgniteCheckedException If failed. + */ + public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) + throws IgniteCheckedException { + return keysToNodes(null, keys); + } + + /** + * Maps single key to a node. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Picked node. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteCheckedException { + Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key)); + + return map != null ? F.first(map.keySet()) : null; + } + + /** + * Maps single key to a node. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Picked node. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key, long topVer) throws IgniteCheckedException { + Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key), topVer); + + return map != null ? F.first(map.keySet()) : null; + } + + /** + * Maps single key to a node on default cache. + * + * @param key Key to map. + * @return Picked node. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <K> ClusterNode mapKeyToNode(K key) throws IgniteCheckedException { + return mapKeyToNode(null, key); + } + + /** + * Gets affinity key for cache key. + * + * @param cacheName Cache name. + * @param key Cache key. + * @return Affinity key. + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + @Nullable public Object affinityKey(@Nullable String cacheName, @Nullable Object key) throws IgniteCheckedException { + if (key == null) + return null; + + AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersion()); + + if (affInfo == null || affInfo.mapper == null) + return null; + + if (affInfo.portableEnabled) + key = ctx.portable().marshalToPortable(key); + + return affInfo.mapper.affinityKey(key); + } + + /** + * @param cacheName Cache name. + * @return Non-null cache name. + */ + private String maskNull(@Nullable String cacheName) { + return cacheName == null ? NULL_NAME : cacheName; + } + + /** + * @param cacheName Cache name. + * @param keys Keys. + * @return Affinity map. + * @throws IgniteCheckedException If failed. + */ + private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, + Collection<? extends K> keys) throws IgniteCheckedException { + return keysToNodes(cacheName, keys, ctx.discovery().topologyVersion()); + } + + /** + * @param cacheName Cache name. + * @param keys Keys. + * @param topVer Topology version. + * @return Affinity map. + * @throws IgniteCheckedException If failed. + */ + private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, + Collection<? extends K> keys, long topVer) throws IgniteCheckedException { + if (F.isEmpty(keys)) + return Collections.emptyMap(); + + ClusterNode loc = ctx.discovery().localNode(); + + if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) + return F.asMap(loc, (Collection<K>)keys); + + AffinityInfo affInfo = affinityCache(cacheName, topVer); + + return affInfo != null ? affinityMap(affInfo, keys) : Collections.<ClusterNode, Collection<K>>emptyMap(); + } + + /** + * @param cacheName Cache name. + * @return Affinity cache. + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("ErrorNotRethrown") + private AffinityInfo affinityCache(@Nullable final String cacheName, long topVer) throws IgniteCheckedException { + AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); + + IgniteFuture<AffinityInfo> fut = affMap.get(key); + + if (fut != null) + return fut.get(); + + ClusterNode loc = ctx.discovery().localNode(); + + // Check local node. + if (U.hasCache(loc, cacheName)) { + GridCacheContext<Object,Object> cctx = ctx.cache().internalCache(cacheName).context(); + + AffinityInfo info = new AffinityInfo( + cctx.config().getAffinity(), + cctx.config().getAffinityMapper(), + new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer)), + cctx.portableEnabled()); + + IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(ctx, info)); + + if (old != null) + info = old.get(); + + return info; + } + + Collection<ClusterNode> cacheNodes = F.view( + ctx.discovery().remoteNodes(), + new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + return U.hasCache(n, cacheName); + } + }); + + if (F.isEmpty(cacheNodes)) + return null; + + GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); + + IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); + + if (old != null) + return old.get(); + + int max = ERROR_RETRIES; + int cnt = 0; + + Iterator<ClusterNode> it = cacheNodes.iterator(); + + // We are here because affinity has not been fetched yet, or cache mode is LOCAL. + while (true) { + cnt++; + + if (!it.hasNext()) + it = cacheNodes.iterator(); + + // Double check since we deal with dynamic view. + if (!it.hasNext()) + // Exception will be caught in this method. + throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); + + ClusterNode n = it.next(); + + GridCacheMode mode = U.cacheMode(n, cacheName); + + assert mode != null; + + // Map all keys to a single node, if the cache mode is LOCAL. + if (mode == LOCAL) { + fut0.onDone(new IgniteCheckedException("Failed to map keys for LOCAL cache.")); + + // Will throw exception. + fut0.get(); + } + + try { + // Resolve cache context for remote node. + // Set affinity function before counting down on latch. + fut0.onDone(affinityInfoFromNode(cacheName, topVer, n)); + + break; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node (will retry) [cache=" + cacheName + + ", node=" + U.toShortString(n) + ", msg=" + e.getMessage() + ']'); + + if (cnt < max) { + U.sleep(ERROR_WAIT); + + continue; + } + + affMap.remove(maskNull(cacheName), fut0); + + fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); + + break; + } + catch (RuntimeException | Error e) { + fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); + + break; + } + } + + return fut0.get(); + } + + /** + * Requests {@link GridCacheAffinityFunction} and {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} from remote node. + * + * @param cacheName Name of cache on which affinity is requested. + * @param n Node from which affinity is requested. + * @return Affinity cached function. + * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects. + */ + private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, long topVer, ClusterNode n) + throws IgniteCheckedException { + GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() + .callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get(); + + GridCacheAffinityFunction f = (GridCacheAffinityFunction)unmarshall(ctx, n.id(), t.get1()); + GridCacheAffinityKeyMapper m = (GridCacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); + + assert m != null; + + // Bring to initial state. + f.reset(); + m.reset(); + + Boolean portableEnabled = U.portableEnabled(n, cacheName); + + return new AffinityInfo(f, m, t.get3(), portableEnabled != null && portableEnabled); + } + + /** + * @param aff Affinity function. + * @param keys Keys. + * @return Affinity map. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"unchecked"}) + private <K> Map<ClusterNode, Collection<K>> affinityMap(AffinityInfo aff, Collection<? extends K> keys) + throws IgniteCheckedException { + assert aff != null; + assert !F.isEmpty(keys); + + try { + if (keys.size() == 1) + return Collections.singletonMap(primary(aff, F.first(keys)), (Collection<K>)keys); + + Map<ClusterNode, Collection<K>> map = new GridLeanMap<>(); + + for (K k : keys) { + ClusterNode n = primary(aff, k); + + Collection<K> mapped = map.get(n); + + if (mapped == null) + map.put(n, mapped = new LinkedList<>()); + + mapped.add(k); + } + + return map; + } + catch (IgniteException e) { + // Affinity calculation may lead to IgniteException if no cache nodes found for pair cacheName+topVer. + throw new IgniteCheckedException("Failed to get affinity map for keys: " + keys, e); + } + } + + /** + * Get primary node for cached key. + * + * @param aff Affinity function. + * @param key Key to check. + * @return Primary node for given key. + * @throws IgniteCheckedException In case of error. + */ + private <K> ClusterNode primary(AffinityInfo aff, K key) throws IgniteCheckedException { + int part = aff.affFunc.partition(aff.mapper.affinityKey(key)); + + Collection<ClusterNode> nodes = aff.assignment.get(part); + + if (F.isEmpty(nodes)) + throw new IgniteCheckedException("Failed to get affinity nodes [aff=" + aff + ", key=" + key + ']'); + + return nodes.iterator().next(); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Affinity processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> affMapSize: " + affMap.size()); + } + + /** + * + */ + private static class AffinityInfo { + /** Affinity function. */ + private GridCacheAffinityFunction affFunc; + + /** Mapper */ + private GridCacheAffinityKeyMapper mapper; + + /** Assignment. */ + private GridAffinityAssignment assignment; + + /** Portable enabled flag. */ + private boolean portableEnabled; + + /** + * @param affFunc Affinity function. + * @param mapper Affinity key mapper. + * @param assignment Partition assignment. + * @param portableEnabled Portable enabled flag. + */ + private AffinityInfo(GridCacheAffinityFunction affFunc, GridCacheAffinityKeyMapper mapper, + GridAffinityAssignment assignment, boolean portableEnabled) { + this.affFunc = affFunc; + this.mapper = mapper; + this.assignment = assignment; + this.portableEnabled = portableEnabled; + } + } + + /** + * + */ + private static class AffinityAssignmentKey { + /** */ + private String cacheName; + + /** */ + private long topVer; + + /** + * @param cacheName Cache name. + * @param topVer Topology version. + */ + private AffinityAssignmentKey(String cacheName, long topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof AffinityAssignmentKey)) + return false; + + AffinityAssignmentKey that = (AffinityAssignmentKey)o; + + return topVer == that.topVer && F.eq(cacheName, that.cacheName); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = cacheName != null ? cacheName.hashCode() : 0; + + res = 31 * res + (int)(topVer ^ (topVer >>> 32)); + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java new file mode 100644 index 0000000..07bf390 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Affinity utility methods. + */ +class GridAffinityUtils { + /** + * Creates a job that will look up {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} and {@link org.apache.ignite.cache.affinity.GridCacheAffinityFunction} on a + * cache with given name. If they exist, this job will serialize and transfer them together with all deployment + * information needed to unmarshal objects on remote node. Result is returned as a {@link GridTuple3}, + * where first object is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.GridCacheAffinityFunction}, second object + * is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} and third object is affinity assignment + * for given topology version. + * + * @param cacheName Cache name. + * @return Affinity job. + */ + static Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> affinityJob( + String cacheName, long topVer) { + return new AffinityJob(cacheName, topVer); + } + + /** + * @param ctx {@code GridKernalContext} instance which provides deployment manager + * @param o Object for which deployment should be obtained. + * @return Deployment object for given instance, + * @throws IgniteCheckedException If node cannot create deployment for given object. + */ + private static GridAffinityMessage affinityMessage(GridKernalContext ctx, Object o) throws IgniteCheckedException { + Class cls = o.getClass(); + + GridDeployment dep = ctx.deploy().deploy(cls, cls.getClassLoader()); + + if (dep == null) + throw new IgniteDeploymentException("Failed to deploy affinity object with class: " + cls.getName()); + + return new GridAffinityMessage( + ctx.config().getMarshaller().marshal(o), + cls.getName(), + dep.classLoaderId(), + dep.deployMode(), + dep.userVersion(), + dep.participants()); + } + + /** + * Unmarshalls transfer object from remote node within a given context. + * + * @param ctx Grid kernal context that provides deployment and marshalling services. + * @param sndNodeId {@link UUID} of the sender node. + * @param msg Transfer object that contains original serialized object and deployment information. + * @return Unmarshalled object. + * @throws IgniteCheckedException If node cannot obtain deployment. + */ + static Object unmarshall(GridKernalContext ctx, UUID sndNodeId, GridAffinityMessage msg) + throws IgniteCheckedException { + GridDeployment dep = ctx.deploy().getGlobalDeployment( + msg.deploymentMode(), + msg.sourceClassName(), + msg.sourceClassName(), + msg.userVersion(), + sndNodeId, + msg.classLoaderId(), + msg.loaderParticipants(), + null); + + if (dep == null) + throw new IgniteDeploymentException("Failed to obtain affinity object (is peer class loading turned on?): " + + msg); + + Object src = ctx.config().getMarshaller().unmarshal(msg.source(), dep.classLoader()); + + // Resource injection. + ctx.resource().inject(dep, dep.deployedClass(msg.sourceClassName()), src); + + return src; + } + + /** Ensure singleton. */ + private GridAffinityUtils() { + // No-op. + } + + /** + * + */ + @GridInternal + private static class AffinityJob implements + Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private String cacheName; + + /** */ + private long topVer; + + /** + * @param cacheName Cache name. + */ + private AffinityJob(@Nullable String cacheName, long topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** + * + */ + public AffinityJob() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> call() + throws Exception { + assert ignite != null; + assert log != null; + + GridKernal kernal = ((GridKernal) ignite); + + GridCacheContext<Object, Object> cctx = kernal.internalCache(cacheName).context(); + + assert cctx != null; + + GridKernalContext ctx = kernal.context(); + + return F.t( + affinityMessage(ctx, cctx.config().getAffinity()), + affinityMessage(ctx, cctx.config().getAffinityMapper()), + new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer))); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeLong(topVer); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topVer = in.readLong(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java new file mode 100644 index 0000000..718980b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Cache affinity function context implementation. Simple bean that holds all required fields. + */ +public class GridCacheAffinityFunctionContextImpl implements GridCacheAffinityFunctionContext { + /** Topology snapshot. */ + private List<ClusterNode> topSnapshot; + + /** Previous affinity assignment. */ + private List<List<ClusterNode>> prevAssignment; + + /** Discovery event that caused this topology change. */ + private IgniteDiscoveryEvent discoEvt; + + /** Topology version. */ + private long topVer; + + /** Number of backups to assign. */ + private int backups; + + /** + * @param topSnapshot Topology snapshot. + * @param topVer Topology version. + */ + public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment, + IgniteDiscoveryEvent discoEvt, long topVer, int backups) { + this.topSnapshot = topSnapshot; + this.prevAssignment = prevAssignment; + this.discoEvt = discoEvt; + this.topVer = topVer; + this.backups = backups; + } + + /** {@inheritDoc} */ + @Nullable @Override public List<ClusterNode> previousAssignment(int part) { + return prevAssignment.get(part); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> currentTopologySnapshot() { + return topSnapshot; + } + + /** {@inheritDoc} */ + @Override public long currentTopologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteDiscoveryEvent discoveryEvent() { + return discoEvt; + } + + /** {@inheritDoc} */ + @Override public int backups() { + return backups; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html new file mode 100644 index 0000000..5bb1751 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Data affinity processor. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java new file mode 100644 index 0000000..9c34e9d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.jetbrains.annotations.*; + +/** + * This enumeration defines different types of closure + * processing by the closure processor. + */ +public enum GridClosurePolicy { + /** Public execution pool. */ + PUBLIC_POOL, + + /** P2P execution pool. */ + P2P_POOL, + + /** System execution pool. */ + SYSTEM_POOL, + + /** GGFS pool. */ + GGFS_POOL; + + /** Enum values. */ + private static final GridClosurePolicy[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static GridClosurePolicy fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +}
