# ignite-63

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b458bd09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b458bd09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b458bd09

Branch: refs/heads/ignite-63
Commit: b458bd090a23639915f0fe8861c50d763f7cc80f
Parents: c602549
Author: sboikov <[email protected]>
Authored: Fri Jan 23 00:08:40 2015 +0300
Committer: sboikov <[email protected]>
Committed: Fri Jan 23 00:08:44 2015 +0300

----------------------------------------------------------------------
 ...GridClientAbstractMultiThreadedSelfTest.java |    2 +-
 .../GridClientPartitionAffinitySelfTest.java    |    2 +-
 .../internal/GridEventConsumeHandler.java       |    3 +-
 .../org/apache/ignite/internal/GridKernal.java  |    8 +-
 .../ignite/internal/GridKernalContext.java      |    8 +-
 .../ignite/internal/GridKernalContextImpl.java  |    8 +-
 .../internal/GridMessageListenHandler.java      |    2 +-
 .../ignite/internal/IgniteMessagingImpl.java    |    2 +-
 .../affinity/GridAffinityAssignment.java        |  168 ++
 .../affinity/GridAffinityAssignmentCache.java   |  409 ++++
 .../affinity/GridAffinityMessage.java           |  164 ++
 .../affinity/GridAffinityProcessor.java         |  528 +++++
 .../processors/affinity/GridAffinityUtils.java  |  187 ++
 .../GridCacheAffinityFunctionContextImpl.java   |   83 +
 .../internal/processors/affinity/package.html   |   23 +
 .../processors/closure/GridClosurePolicy.java   |   51 +
 .../closure/GridClosureProcessor.java           | 1744 +++++++++++++++++
 .../GridMasterLeaveAwareComputeJobAdapter.java  |   36 +
 .../closure/GridPeerDeployAwareTaskAdapter.java |   60 +
 .../internal/processors/closure/package.html    |   23 +
 .../continuous/GridContinuousHandler.java       |  105 +
 .../continuous/GridContinuousMessage.java       |  256 +++
 .../continuous/GridContinuousMessageType.java   |   56 +
 .../continuous/GridContinuousProcessor.java     | 1846 ++++++++++++++++++
 .../dataload/GridDataLoadCacheUpdaters.java     |  292 +++
 .../dataload/GridDataLoadRequest.java           |  548 ++++++
 .../dataload/GridDataLoadResponse.java          |  181 ++
 .../dataload/GridDataLoadUpdateJob.java         |  120 ++
 .../dataload/GridDataLoaderFuture.java          |   75 +
 .../dataload/GridDataLoaderProcessor.java       |  318 +++
 .../dataload/IgniteDataLoaderImpl.java          | 1346 +++++++++++++
 .../internal/processors/dataload/package.html   |   23 +
 .../processors/fs/GridGgfsDataManager.java      |    2 +-
 .../processors/fs/GridGgfsIpcHandler.java       |    2 +-
 .../task/GridStreamerBroadcastTask.java         |    2 +-
 .../streamer/task/GridStreamerQueryTask.java    |    2 +-
 .../streamer/task/GridStreamerReduceTask.java   |    2 +-
 .../GridTcpCommunicationMessageFactory.java     |    4 +-
 .../affinity/GridAffinityAssignment.java        |  168 --
 .../affinity/GridAffinityAssignmentCache.java   |  409 ----
 .../affinity/GridAffinityMessage.java           |  164 --
 .../affinity/GridAffinityProcessor.java         |  528 -----
 .../processors/affinity/GridAffinityUtils.java  |  187 --
 .../GridCacheAffinityFunctionContextImpl.java   |   83 -
 .../kernal/processors/affinity/package.html     |   23 -
 .../cache/GridCacheAffinityManager.java         |    2 +-
 .../processors/cache/GridCacheContext.java      |    2 +-
 .../GridCacheContinuousQueryAdapter.java        |    2 +-
 .../GridCacheContinuousQueryHandler.java        |    2 +-
 .../processors/closure/GridClosurePolicy.java   |   51 -
 .../closure/GridClosureProcessor.java           | 1744 -----------------
 .../GridMasterLeaveAwareComputeJobAdapter.java  |   36 -
 .../closure/GridPeerDeployAwareTaskAdapter.java |   60 -
 .../grid/kernal/processors/closure/package.html |   23 -
 .../continuous/GridContinuousHandler.java       |  105 -
 .../continuous/GridContinuousMessage.java       |  256 ---
 .../continuous/GridContinuousMessageType.java   |   56 -
 .../continuous/GridContinuousProcessor.java     | 1846 ------------------
 .../dataload/GridDataLoadCacheUpdaters.java     |  292 ---
 .../dataload/GridDataLoadRequest.java           |  548 ------
 .../dataload/GridDataLoadResponse.java          |  181 --
 .../dataload/GridDataLoadUpdateJob.java         |  120 --
 .../dataload/GridDataLoaderFuture.java          |   75 -
 .../dataload/GridDataLoaderProcessor.java       |  318 ---
 .../dataload/IgniteDataLoaderImpl.java          | 1346 -------------
 .../kernal/processors/dataload/package.html     |   23 -
 .../GridAffinityProcessorAbstractSelfTest.java  |  194 ++
 ...AffinityProcessorConsistentHashSelfTest.java |   31 +
 ...GridAffinityProcessorRendezvousSelfTest.java |   31 +
 .../IgniteCacheEntryListenerAbstractTest.java   |    2 +-
 .../closure/GridClosureProcessorRemoteTest.java |  119 ++
 .../closure/GridClosureProcessorSelfTest.java   |  541 +++++
 .../internal/processors/closure/package.html    |   23 +
 .../continuous/GridEventConsumeSelfTest.java    | 1079 ++++++++++
 .../continuous/GridMessageListenSelfTest.java   |  489 +++++
 .../dataload/GridDataLoaderImplSelfTest.java    |  215 ++
 .../dataload/GridDataLoaderPerformanceTest.java |  215 ++
 .../GridDataLoaderProcessorSelfTest.java        |  883 +++++++++
 .../GridServiceProcessorAbstractSelfTest.java   |    2 +-
 .../util/future/GridFutureAdapterSelfTest.java  |    2 +-
 .../GridCachePartitionFairAffinitySelfTest.java |    2 +-
 .../GridAffinityProcessorAbstractSelfTest.java  |  194 --
 ...AffinityProcessorConsistentHashSelfTest.java |   31 -
 ...GridAffinityProcessorRendezvousSelfTest.java |   31 -
 .../cache/GridCacheAffinityApiSelfTest.java     |    2 +-
 ...dCachePartitionedQueueEntryMoveSelfTest.java |    2 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java |    2 +-
 .../closure/GridClosureProcessorRemoteTest.java |  119 --
 .../closure/GridClosureProcessorSelfTest.java   |  541 -----
 .../grid/kernal/processors/closure/package.html |   23 -
 .../continuous/GridEventConsumeSelfTest.java    | 1079 ----------
 .../continuous/GridMessageListenSelfTest.java   |  489 -----
 .../dataload/GridDataLoaderImplSelfTest.java    |  215 --
 .../dataload/GridDataLoaderPerformanceTest.java |  215 --
 .../GridDataLoaderProcessorSelfTest.java        |  883 ---------
 .../testsuites/bamboo/GridBasicTestSuite.java   |    6 +-
 .../bamboo/GridDataGridTestSuite.java           |    2 +-
 97 files changed, 12501 insertions(+), 12502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java
index a9c1ec0..6ce379d 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/client/GridClientAbstractMultiThreadedSelfTest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.resources.*;
 import org.apache.ignite.client.balancer.*;
 import org.apache.ignite.client.impl.*;
 import org.apache.ignite.client.ssl.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java
index de8de76..3f3b624 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/client/impl/GridClientPartitionAffinitySelfTest.java
@@ -21,7 +21,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.affinity.consistenthash.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.client.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.gridgain.testframework.*;
 import org.gridgain.testframework.junits.common.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 9aaddc5..4c224f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -20,13 +20,12 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
index 5c923bd..f6eb037 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
@@ -47,12 +47,12 @@ import org.apache.ignite.internal.managers.loadbalancer.*;
 import org.apache.ignite.internal.managers.securesession.*;
 import org.apache.ignite.internal.managers.security.*;
 import org.apache.ignite.internal.managers.swapspace.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.apache.ignite.internal.processors.clock.*;
-import org.gridgain.grid.kernal.processors.closure.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
-import org.gridgain.grid.kernal.processors.dataload.*;
+import org.apache.ignite.internal.processors.closure.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.processors.email.*;
 import org.apache.ignite.internal.processors.interop.*;
 import org.apache.ignite.internal.processors.job.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 132497e..e93bbd6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -33,12 +33,12 @@ import org.apache.ignite.internal.managers.indexing.*;
 import org.apache.ignite.internal.managers.loadbalancer.*;
 import org.apache.ignite.internal.managers.securesession.*;
 import org.apache.ignite.internal.managers.swapspace.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.apache.ignite.internal.processors.clock.*;
-import org.gridgain.grid.kernal.processors.closure.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
-import org.gridgain.grid.kernal.processors.dataload.*;
+import org.apache.ignite.internal.processors.closure.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.processors.email.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.interop.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 26c69b9..7a6e9fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -33,14 +33,14 @@ import org.apache.ignite.internal.managers.loadbalancer.*;
 import org.apache.ignite.internal.managers.securesession.*;
 import org.apache.ignite.internal.managers.security.*;
 import org.apache.ignite.internal.managers.swapspace.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.dr.*;
 import org.gridgain.grid.kernal.processors.cache.dr.os.*;
 import org.apache.ignite.internal.processors.clock.*;
-import org.gridgain.grid.kernal.processors.closure.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
-import org.gridgain.grid.kernal.processors.dataload.*;
+import org.apache.ignite.internal.processors.closure.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.processors.email.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.interop.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index abff829..9aca42c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.managers.deployment.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index d9bdc3b..fe82ac5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
new file mode 100644
index 0000000..580f64c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Cached affinity calculations.
+ */
+class GridAffinityAssignment implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology version. */
+    private final long topVer;
+
+    /** Collection of calculated affinity nodes. */
+    private List<List<ClusterNode>> assignment;
+
+    /** Map of primary node partitions. */
+    private final Map<UUID, Set<Integer>> primary;
+
+    /** Map of backup node partitions. */
+    private final Map<UUID, Set<Integer>> backup;
+
+    /**
+     * Constructs cached affinity calculations item.
+     *
+     * @param topVer Topology version.
+     */
+    GridAffinityAssignment(long topVer) {
+        this.topVer = topVer;
+        primary = new HashMap<>();
+        backup = new HashMap<>();
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param assignment Assignment.
+     */
+    GridAffinityAssignment(long topVer, List<List<ClusterNode>> assignment) {
+        this.topVer = topVer;
+        this.assignment = assignment;
+
+        primary = new HashMap<>();
+        backup = new HashMap<>();
+
+        initPrimaryBackupMaps();
+    }
+
+    /**
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * Get affinity nodes for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes.
+     */
+    public List<ClusterNode> get(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is 
out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        return assignment.get(part);
+    }
+
+    /**
+     * Get primary partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get primary partitions for.
+     * @return Primary partitions for specified node ID.
+     */
+    public Set<Integer> primaryPartitions(UUID nodeId) {
+        Set<Integer> set = primary.get(nodeId);
+
+        return set == null ? Collections.<Integer>emptySet() : 
Collections.unmodifiableSet(set);
+    }
+
+    /**
+     * Get backup partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get backup partitions for.
+     * @return Backup partitions for specified node ID.
+     */
+    public Set<Integer> backupPartitions(UUID nodeId) {
+        Set<Integer> set = backup.get(nodeId);
+
+        return set == null ? Collections.<Integer>emptySet() : 
Collections.unmodifiableSet(set);
+    }
+
+    /**
+     * Initializes primary and backup maps.
+     */
+    private void initPrimaryBackupMaps() {
+        // Temporary mirrors with modifiable partition's collections.
+        Map<UUID, Set<Integer>> tmpPrm = new HashMap<>();
+        Map<UUID, Set<Integer>> tmpBkp = new HashMap<>();
+
+        for (int partsCnt = assignment.size(), p = 0; p < partsCnt; p++) {
+            // Use the first node as primary, other - backups.
+            Map<UUID, Set<Integer>> tmp = tmpPrm;
+            Map<UUID, Set<Integer>> map = primary;
+
+            for (ClusterNode node : assignment.get(p)) {
+                UUID id = node.id();
+
+                Set<Integer> set = tmp.get(id);
+
+                if (set == null) {
+                    tmp.put(id, set = new HashSet<>());
+                    map.put(id, Collections.unmodifiableSet(set));
+                }
+
+                set.add(p);
+
+                // Use the first node as primary, other - backups.
+                tmp = tmpBkp;
+                map = backup;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(topVer ^ (topVer >>> 32));
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("SimplifiableIfStatement")
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        return topVer == ((GridAffinityAssignment)o).topVer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
new file mode 100644
index 0000000..4730512
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.portables.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Affinity cached function.
+ */
+public class GridAffinityAssignmentCache {
+    /** Node order comparator. */
+    private static final Comparator<ClusterNode> nodeCmp = new 
GridNodeOrderComparator();
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Number of backups. */
+    private int backups;
+
+    /** Affinity function. */
+    private final GridCacheAffinityFunction aff;
+
+    /** Partitions count. */
+    private final int partsCnt;
+
+    /** Affinity mapper function. */
+    private final GridCacheAffinityKeyMapper affMapper;
+
+    /** Affinity calculation results cache: topology version => partition => 
nodes. */
+    private final ConcurrentMap<Long, GridAffinityAssignment> affCache;
+
+    /** Cache item corresponding to the head topology version. */
+    private final AtomicReference<GridAffinityAssignment> head;
+
+    /** Discovery manager. */
+    private final GridCacheContext ctx;
+
+    /** Ready futures. */
+    private final ConcurrentMap<Long, AffinityReadyFuture> readyFuts = new 
ConcurrentHashMap8<>();
+
+    /** Log. */
+    private IgniteLogger log;
+
+    /**
+     * Constructs affinity cached calculations.
+     *
+     * @param ctx Kernal context.
+     * @param cacheName Cache name.
+     * @param aff Affinity function.
+     * @param affMapper Affinity key mapper.
+     */
+    @SuppressWarnings("unchecked")
+    public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, 
GridCacheAffinityFunction aff,
+        GridCacheAffinityKeyMapper affMapper, int backups) {
+        this.ctx = ctx;
+        this.aff = aff;
+        this.affMapper = affMapper;
+        this.cacheName = cacheName;
+        this.backups = backups;
+
+        log = ctx.logger(GridAffinityAssignmentCache.class);
+
+        partsCnt = aff.partitions();
+        affCache = new ConcurrentLinkedHashMap<>();
+        head = new AtomicReference<>(new GridAffinityAssignment(-1));
+    }
+
+    /**
+     * Initializes affinity with given topology version and assignment. The 
assignment is calculated on remote nodes
+     * and brought to local node on partition map exchange.
+     *
+     * @param topVer Topology version.
+     * @param affAssignment Affinity assignment for topology version.
+     */
+    public void initialize(long topVer, List<List<ClusterNode>> affAssignment) 
{
+        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment);
+
+        affCache.put(topVer, assignment);
+        head.set(assignment);
+
+        for (Map.Entry<Long, AffinityReadyFuture> entry : 
readyFuts.entrySet()) {
+            if (entry.getKey() >= topVer)
+                entry.getValue().onDone(topVer);
+        }
+    }
+
+    /**
+     * Calculates affinity cache for given topology version.
+     *
+     * @param topVer Topology version to calculate affinity cache for.
+     * @param discoEvt Discovery event that caused this topology version 
change.
+     */
+    public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent 
discoEvt) {
+        if (log.isDebugEnabled())
+            log.debug("Calculating affinity [topVer=" + topVer + ", 
locNodeId=" + ctx.localNodeId() +
+                ", discoEvt=" + discoEvt + ']');
+
+        GridAffinityAssignment prev = affCache.get(topVer - 1);
+
+        List<ClusterNode> sorted;
+
+        if (ctx.isLocal())
+            // For local cache always use local node.
+            sorted = Collections.singletonList(ctx.localNode());
+        else {
+            // Resolve nodes snapshot for specified topology version.
+            Collection<ClusterNode> nodes = 
ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+
+            sorted = sort(nodes);
+        }
+
+        List<List<ClusterNode>> prevAssignment = prev == null ? null : 
prev.assignment();
+
+        List<List<ClusterNode>> assignment = aff.assignPartitions(
+            new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, 
discoEvt, topVer, backups));
+
+        GridAffinityAssignment updated = new GridAffinityAssignment(topVer, 
assignment);
+
+        updated = F.addIfAbsent(affCache, topVer, updated);
+
+        // Update top version, if required.
+        while (true) {
+            GridAffinityAssignment headItem = head.get();
+
+            if (headItem.topologyVersion() >= topVer)
+                break;
+
+            if (head.compareAndSet(headItem, updated))
+                break;
+        }
+
+        for (Map.Entry<Long, AffinityReadyFuture> entry : 
readyFuts.entrySet()) {
+            if (entry.getKey() <= topVer) {
+                if (log.isDebugEnabled())
+                    log.debug("Completing topology ready future (calculated 
affinity) [locNodeId=" + ctx.localNodeId() +
+                        ", futVer=" + entry.getKey() + ", topVer=" + topVer + 
']');
+
+                entry.getValue().onDone(topVer);
+            }
+        }
+
+        return updated.assignment();
+    }
+
+    /**
+     * @return Last calculated affinity version.
+     */
+    public long lastVersion() {
+        return head.get().topologyVersion();
+    }
+
+    /**
+     * Clean up outdated cache items.
+     *
+     * @param topVer Actual topology version, older versions will be removed.
+     */
+    public void cleanUpCache(long topVer) {
+        if (log.isDebugEnabled())
+            log.debug("Cleaning up cache for version [locNodeId=" + 
ctx.localNodeId() +
+                ", topVer=" + topVer + ']');
+
+        for (Iterator<Long> it = affCache.keySet().iterator(); it.hasNext(); )
+            if (it.next() < topVer)
+                it.remove();
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> assignments(long topVer) {
+        GridAffinityAssignment aff = cachedAffinity(topVer);
+
+        return aff.assignment();
+    }
+
+    /**
+     * Gets future that will be completed after topology with version {@code 
topVer} is calculated.
+     *
+     * @param topVer Topology version to await for.
+     * @return Future that will be completed after affinity for topology 
version {@code topVer} is calculated.
+     */
+    public IgniteFuture<Long> readyFuture(long topVer) {
+        GridAffinityAssignment aff = head.get();
+
+        if (aff.topologyVersion() >= topVer) {
+            if (log.isDebugEnabled())
+                log.debug("Returning finished future for readyFuture [head=" + 
aff.topologyVersion() +
+                    ", topVer=" + topVer + ']');
+
+            return null;
+        }
+
+        GridFutureAdapter<Long> fut = F.addIfAbsent(readyFuts, topVer,
+            new AffinityReadyFuture(ctx.kernalContext()));
+
+        aff = head.get();
+
+        if (aff.topologyVersion() >= topVer) {
+            if (log.isDebugEnabled())
+                log.debug("Completing topology ready future right away [head=" 
+ aff.topologyVersion() +
+                    ", topVer=" + topVer + ']');
+
+            fut.onDone(topVer);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @return Partition count.
+     */
+    public int partitions() {
+        return partsCnt;
+    }
+
+    /**
+     * NOTE: Use this method always when you need to calculate partition id for
+     * a key provided by user. It's required since we should apply affinity 
mapper
+     * logic in order to find a key that will eventually be passed to affinity 
function.
+     *
+     * @param key Key.
+     * @return Partition.
+     */
+    public int partition(Object key) {
+        if (ctx.portableEnabled()) {
+            try {
+                key = ctx.marshalToPortable(key);
+            }
+            catch (PortableException e) {
+                U.error(log, "Failed to marshal key to portable: " + key, e);
+            }
+        }
+
+        return aff.partition(affMapper.affinityKey(key));
+    }
+
+    /**
+     * Gets affinity nodes for specified partition.
+     *
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @return Affinity nodes.
+     */
+    public List<ClusterNode> nodes(int part, long topVer) {
+        // Resolve cached affinity nodes.
+        return cachedAffinity(topVer).get(part);
+    }
+
+    /**
+     * Get primary partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get primary partitions for.
+     * @param topVer Topology version.
+     * @return Primary partitions for specified node ID.
+     */
+    public Set<Integer> primaryPartitions(UUID nodeId, long topVer) {
+        return cachedAffinity(topVer).primaryPartitions(nodeId);
+    }
+
+    /**
+     * Get backup partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get backup partitions for.
+     * @param topVer Topology version.
+     * @return Backup partitions for specified node ID.
+     */
+    public Set<Integer> backupPartitions(UUID nodeId, long topVer) {
+        return cachedAffinity(topVer).backupPartitions(nodeId);
+    }
+
+    /**
+     * Get cached affinity for specified topology version.
+     *
+     * @param topVer Topology version.
+     * @return Cached affinity.
+     */
+    private GridAffinityAssignment cachedAffinity(long topVer) {
+        if (topVer == -1)
+            topVer = lastVersion();
+        else
+            awaitTopologyVersion(topVer);
+
+        assert topVer >= 0 : topVer;
+
+        GridAffinityAssignment cache = head.get();
+
+        if (cache.topologyVersion() != topVer) {
+            cache = affCache.get(topVer);
+
+            if (cache == null) {
+                throw new IllegalStateException("Getting affinity for topology 
version earlier than affinity is " +
+                    "calculated [locNodeId=" + ctx.localNodeId() + ", topVer=" 
+ topVer +
+                    ", head=" + head.get().topologyVersion() + ']');
+            }
+        }
+
+        assert cache.topologyVersion() == topVer : "Invalid cached affinity: " 
+ cache;
+
+        return cache;
+    }
+
+    /**
+     * @param topVer Topology version to wait.
+     */
+    private void awaitTopologyVersion(long topVer) {
+        GridAffinityAssignment aff = head.get();
+
+        if (aff.topologyVersion() >= topVer)
+            return;
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Will wait for topology version [locNodeId=" + 
ctx.localNodeId() +
+                ", topVer=" + topVer + ']');
+
+            IgniteFuture<Long> fut = readyFuture(topVer);
+
+            if (fut != null)
+                fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to wait for affinity ready 
future for topology version: " + topVer,
+                e);
+        }
+    }
+
+    /**
+     * Sorts nodes according to order.
+     *
+     * @param nodes Nodes to sort.
+     * @return Sorted list of nodes.
+     */
+    private List<ClusterNode> sort(Collection<ClusterNode> nodes) {
+        List<ClusterNode> sorted = new ArrayList<>(nodes.size());
+
+        sorted.addAll(nodes);
+
+        Collections.sort(sorted, nodeCmp);
+
+        return sorted;
+    }
+
+    /**
+     * Affinity ready future. Will remove itself from ready futures map.
+     */
+    private class AffinityReadyFuture extends GridFutureAdapter<Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public AffinityReadyFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         */
+        private AffinityReadyFuture(GridKernalContext ctx) {
+            super(ctx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(Long res, @Nullable Throwable err) {
+            assert res != null;
+
+            boolean done = super.onDone(res, err);
+
+            if (done)
+                readyFuts.remove(res, this);
+
+            return done;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java
new file mode 100644
index 0000000..1e67237
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityMessage.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Object wrapper containing serialized byte array of original object and 
deployment information.
+ */
+class GridAffinityMessage implements Externalizable, 
IgniteOptimizedMarshallable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", 
"AbbreviationUsage", "UnusedDeclaration"})
+    private static Object GG_CLASS_ID;
+
+    /** */
+    private byte[] src;
+
+    /** */
+    private IgniteUuid clsLdrId;
+
+    /** */
+    private IgniteDeploymentMode depMode;
+
+    /** */
+    private String srcClsName;
+
+    /** */
+    private String userVer;
+
+    /** Node class loader participants. */
+    @GridToStringInclude
+    private Map<UUID, IgniteUuid> ldrParties;
+
+    /**
+     * @param src Source object.
+     * @param srcClsName Source object class name.
+     * @param clsLdrId Class loader ID.
+     * @param depMode Deployment mode.
+     * @param userVer User version.
+     * @param ldrParties Node loader participant map.
+     */
+    GridAffinityMessage(
+        byte[] src,
+        String srcClsName,
+        IgniteUuid clsLdrId,
+        IgniteDeploymentMode depMode,
+        String userVer,
+        Map<UUID, IgniteUuid> ldrParties) {
+        this.src = src;
+        this.srcClsName = srcClsName;
+        this.depMode = depMode;
+        this.clsLdrId = clsLdrId;
+        this.userVer = userVer;
+        this.ldrParties = ldrParties;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridAffinityMessage() {
+        // No-op.
+    }
+
+    /**
+     * @return Source object.
+     */
+    public byte[] source() {
+        return src;
+    }
+
+    /**
+     * @return the Class loader ID.
+     */
+    public IgniteUuid classLoaderId() {
+        return clsLdrId;
+    }
+
+    /**
+     * @return Deployment mode.
+     */
+    public IgniteDeploymentMode deploymentMode() {
+        return depMode;
+    }
+
+    /**
+     * @return Source message class name.
+     */
+    public String sourceClassName() {
+        return srcClsName;
+    }
+
+    /**
+     * @return User version.
+     */
+    public String userVersion() {
+        return userVer;
+    }
+
+    /**
+     * @return Node class loader participant map.
+     */
+    public Map<UUID, IgniteUuid> loaderParticipants() {
+        return ldrParties != null ? Collections.unmodifiableMap(ldrParties) : 
null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object ggClassId() {
+        return GG_CLASS_ID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeByteArray(out, src);
+
+        out.writeInt(depMode.ordinal());
+
+        U.writeGridUuid(out, clsLdrId);
+        U.writeString(out, srcClsName);
+        U.writeString(out, userVer);
+        U.writeMap(out, ldrParties);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        src = U.readByteArray(in);
+
+        depMode = IgniteDeploymentMode.fromOrdinal(in.readInt());
+
+        clsLdrId = U.readGridUuid(in);
+        srcClsName = U.readString(in);
+        userVer = U.readString(in);
+        ldrParties = U.readMap(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridAffinityMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
new file mode 100644
index 0000000..4d7a466
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static 
org.apache.ignite.internal.processors.affinity.GridAffinityUtils.*;
+
+/**
+ * Data affinity processor.
+ */
+public class GridAffinityProcessor extends GridProcessorAdapter {
+    /** Affinity map cleanup delay (ms). */
+    private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000;
+
+    /** Retries to get affinity in case of error. */
+    private static final int ERROR_RETRIES = 3;
+
+    /** Time to wait between errors (in milliseconds). */
+    private static final long ERROR_WAIT = 500;
+
+    /** Null cache name. */
+    private static final String NULL_NAME = U.id8(UUID.randomUUID());
+
+    /** Affinity map. */
+    private final ConcurrentMap<AffinityAssignmentKey, 
IgniteFuture<AffinityInfo>> affMap = new ConcurrentHashMap8<>();
+
+    /** Listener. */
+    private final GridLocalEventListener lsnr = new GridLocalEventListener() {
+        @Override public void onEvent(IgniteEvent evt) {
+            int evtType = evt.type();
+
+            assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT || 
evtType == EVT_NODE_JOINED;
+
+            if (affMap.isEmpty())
+                return; // Skip empty affinity map.
+
+            final IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+            // Clean up affinity functions if such cache no more exists.
+            if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) {
+                final Collection<String> caches = new HashSet<>();
+
+                for (ClusterNode clusterNode : ctx.discovery().allNodes())
+                    caches.addAll(U.cacheNames(clusterNode));
+
+                final Collection<AffinityAssignmentKey> rmv = new 
GridLeanSet<>();
+
+                for (AffinityAssignmentKey key : affMap.keySet()) {
+                    if (!caches.contains(key.cacheName) || key.topVer < 
discoEvt.topologyVersion() - 1)
+                        rmv.add(key);
+                }
+
+                ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter(
+                    IgniteUuid.fromUuid(ctx.localNodeId()), 
AFFINITY_MAP_CLEAN_UP_DELAY) {
+                    @Override public void onTimeout() {
+                        affMap.keySet().removeAll(rmv);
+                    }
+                });
+            }
+        }
+    };
+
+    /**
+     * @param ctx Context.
+     */
+    public GridAffinityProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT, EVT_NODE_JOINED);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (ctx != null && ctx.event() != null)
+            ctx.event().removeLocalEventListener(lsnr);
+    }
+
+    /**
+     * Maps keys to nodes for given cache.
+     *
+     * @param cacheName Cache name.
+     * @param keys Keys to map.
+     * @return Map of nodes to keys.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String 
cacheName,
+        @Nullable Collection<? extends K> keys) throws IgniteCheckedException {
+        return keysToNodes(cacheName, keys);
+    }
+
+    /**
+     * Maps keys to nodes on default cache.
+     *
+     * @param keys Keys to map.
+     * @return Map of nodes to keys.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable 
Collection<? extends K> keys)
+        throws IgniteCheckedException {
+        return keysToNodes(null, keys);
+    }
+
+    /**
+     * Maps single key to a node.
+     *
+     * @param cacheName Cache name.
+     * @param key Key to map.
+     * @return Picked node.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, 
K key) throws IgniteCheckedException {
+        Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, 
F.asList(key));
+
+        return map != null ? F.first(map.keySet()) : null;
+    }
+
+    /**
+     * Maps single key to a node.
+     *
+     * @param cacheName Cache name.
+     * @param key Key to map.
+     * @return Picked node.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, 
K key, long topVer) throws IgniteCheckedException {
+        Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, 
F.asList(key), topVer);
+
+        return map != null ? F.first(map.keySet()) : null;
+    }
+
+    /**
+     * Maps single key to a node on default cache.
+     *
+     * @param key Key to map.
+     * @return Picked node.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public <K> ClusterNode mapKeyToNode(K key) throws 
IgniteCheckedException {
+        return mapKeyToNode(null, key);
+    }
+
+    /**
+     * Gets affinity key for cache key.
+     *
+     * @param cacheName Cache name.
+     * @param key Cache key.
+     * @return Affinity key.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public Object affinityKey(@Nullable String cacheName, @Nullable 
Object key) throws IgniteCheckedException {
+        if (key == null)
+            return null;
+
+        AffinityInfo affInfo = affinityCache(cacheName, 
ctx.discovery().topologyVersion());
+
+        if (affInfo == null || affInfo.mapper == null)
+            return null;
+
+        if (affInfo.portableEnabled)
+            key = ctx.portable().marshalToPortable(key);
+
+        return affInfo.mapper.affinityKey(key);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Non-null cache name.
+     */
+    private String maskNull(@Nullable String cacheName) {
+        return cacheName == null ? NULL_NAME : cacheName;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param keys Keys.
+     * @return Affinity map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final 
String cacheName,
+        Collection<? extends K> keys) throws IgniteCheckedException {
+        return keysToNodes(cacheName, keys, ctx.discovery().topologyVersion());
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param keys Keys.
+     * @param topVer Topology version.
+     * @return Affinity map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final 
String cacheName,
+        Collection<? extends K> keys, long topVer) throws 
IgniteCheckedException {
+        if (F.isEmpty(keys))
+            return Collections.emptyMap();
+
+        ClusterNode loc = ctx.discovery().localNode();
+
+        if (U.hasCache(loc, cacheName) && 
ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL)
+            return F.asMap(loc, (Collection<K>)keys);
+
+        AffinityInfo affInfo = affinityCache(cacheName, topVer);
+
+        return affInfo != null ? affinityMap(affInfo, keys) : 
Collections.<ClusterNode, Collection<K>>emptyMap();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Affinity cache.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private AffinityInfo affinityCache(@Nullable final String cacheName, long 
topVer) throws IgniteCheckedException {
+        AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, 
topVer);
+
+        IgniteFuture<AffinityInfo> fut = affMap.get(key);
+
+        if (fut != null)
+            return fut.get();
+
+        ClusterNode loc = ctx.discovery().localNode();
+
+        // Check local node.
+        if (U.hasCache(loc, cacheName)) {
+            GridCacheContext<Object,Object> cctx = 
ctx.cache().internalCache(cacheName).context();
+
+            AffinityInfo info = new AffinityInfo(
+                cctx.config().getAffinity(),
+                cctx.config().getAffinityMapper(),
+                new GridAffinityAssignment(topVer, 
cctx.affinity().assignments(topVer)),
+                cctx.portableEnabled());
+
+            IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, new 
GridFinishedFuture<>(ctx, info));
+
+            if (old != null)
+                info = old.get();
+
+            return info;
+        }
+
+        Collection<ClusterNode> cacheNodes = F.view(
+            ctx.discovery().remoteNodes(),
+            new P1<ClusterNode>() {
+                @Override public boolean apply(ClusterNode n) {
+                    return U.hasCache(n, cacheName);
+                }
+            });
+
+        if (F.isEmpty(cacheNodes))
+            return null;
+
+        GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>();
+
+        IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
+
+        if (old != null)
+            return old.get();
+
+        int max = ERROR_RETRIES;
+        int cnt = 0;
+
+        Iterator<ClusterNode> it = cacheNodes.iterator();
+
+        // We are here because affinity has not been fetched yet, or cache 
mode is LOCAL.
+        while (true) {
+            cnt++;
+
+            if (!it.hasNext())
+                it = cacheNodes.iterator();
+
+            // Double check since we deal with dynamic view.
+            if (!it.hasNext())
+                // Exception will be caught in this method.
+                throw new IgniteCheckedException("No cache nodes in topology 
for cache name: " + cacheName);
+
+            ClusterNode n = it.next();
+
+            GridCacheMode mode = U.cacheMode(n, cacheName);
+
+            assert mode != null;
+
+            // Map all keys to a single node, if the cache mode is LOCAL.
+            if (mode == LOCAL) {
+                fut0.onDone(new IgniteCheckedException("Failed to map keys for 
LOCAL cache."));
+
+                // Will throw exception.
+                fut0.get();
+            }
+
+            try {
+                // Resolve cache context for remote node.
+                // Set affinity function before counting down on latch.
+                fut0.onDone(affinityInfoFromNode(cacheName, topVer, n));
+
+                break;
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to get affinity from node (will retry) 
[cache=" + cacheName +
+                        ", node=" + U.toShortString(n) + ", msg=" + 
e.getMessage() + ']');
+
+                if (cnt < max) {
+                    U.sleep(ERROR_WAIT);
+
+                    continue;
+                }
+
+                affMap.remove(maskNull(cacheName), fut0);
+
+                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + n, e));
+
+                break;
+            }
+            catch (RuntimeException | Error e) {
+                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + n, e));
+
+                break;
+            }
+        }
+
+        return fut0.get();
+    }
+
+    /**
+     * Requests {@link GridCacheAffinityFunction} and {@link 
org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} from remote node.
+     *
+     * @param cacheName Name of cache on which affinity is requested.
+     * @param n Node from which affinity is requested.
+     * @return Affinity cached function.
+     * @throws IgniteCheckedException If either local or remote node cannot 
get deployment for affinity objects.
+     */
+    private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, long 
topVer, ClusterNode n)
+        throws IgniteCheckedException {
+        GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment> t = ctx.closure()
+            .callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), 
F.asList(n), true/*system pool*/).get();
+
+        GridCacheAffinityFunction f = 
(GridCacheAffinityFunction)unmarshall(ctx, n.id(), t.get1());
+        GridCacheAffinityKeyMapper m = 
(GridCacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
+
+        assert m != null;
+
+        // Bring to initial state.
+        f.reset();
+        m.reset();
+
+        Boolean portableEnabled = U.portableEnabled(n, cacheName);
+
+        return new AffinityInfo(f, m, t.get3(), portableEnabled != null && 
portableEnabled);
+    }
+
+    /**
+     * @param aff Affinity function.
+     * @param keys Keys.
+     * @return Affinity map.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings({"unchecked"})
+    private <K> Map<ClusterNode, Collection<K>> affinityMap(AffinityInfo aff, 
Collection<? extends K> keys)
+        throws IgniteCheckedException {
+        assert aff != null;
+        assert !F.isEmpty(keys);
+
+        try {
+            if (keys.size() == 1)
+                return Collections.singletonMap(primary(aff, F.first(keys)), 
(Collection<K>)keys);
+
+            Map<ClusterNode, Collection<K>> map = new GridLeanMap<>();
+
+            for (K k : keys) {
+                ClusterNode n = primary(aff, k);
+
+                Collection<K> mapped = map.get(n);
+
+                if (mapped == null)
+                    map.put(n, mapped = new LinkedList<>());
+
+                mapped.add(k);
+            }
+
+            return map;
+        }
+        catch (IgniteException e) {
+            // Affinity calculation may lead to IgniteException if no cache 
nodes found for pair cacheName+topVer.
+            throw new IgniteCheckedException("Failed to get affinity map for 
keys: " + keys, e);
+        }
+    }
+
+    /**
+     * Get primary node for cached key.
+     *
+     * @param aff Affinity function.
+     * @param key Key to check.
+     * @return Primary node for given key.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private <K> ClusterNode primary(AffinityInfo aff, K key) throws 
IgniteCheckedException {
+        int part = aff.affFunc.partition(aff.mapper.affinityKey(key));
+
+        Collection<ClusterNode> nodes = aff.assignment.get(part);
+
+        if (F.isEmpty(nodes))
+            throw new IgniteCheckedException("Failed to get affinity nodes 
[aff=" + aff + ", key=" + key + ']');
+
+        return nodes.iterator().next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> Affinity processor memory stats [grid=" + 
ctx.gridName() + ']');
+        X.println(">>>   affMapSize: " + affMap.size());
+    }
+
+    /**
+     *
+     */
+    private static class AffinityInfo {
+        /** Affinity function. */
+        private GridCacheAffinityFunction affFunc;
+
+        /** Mapper */
+        private GridCacheAffinityKeyMapper mapper;
+
+        /** Assignment. */
+        private GridAffinityAssignment assignment;
+
+        /** Portable enabled flag. */
+        private boolean portableEnabled;
+
+        /**
+         * @param affFunc Affinity function.
+         * @param mapper Affinity key mapper.
+         * @param assignment Partition assignment.
+         * @param portableEnabled Portable enabled flag.
+         */
+        private AffinityInfo(GridCacheAffinityFunction affFunc, 
GridCacheAffinityKeyMapper mapper,
+            GridAffinityAssignment assignment, boolean portableEnabled) {
+            this.affFunc = affFunc;
+            this.mapper = mapper;
+            this.assignment = assignment;
+            this.portableEnabled = portableEnabled;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AffinityAssignmentKey {
+        /** */
+        private String cacheName;
+
+        /** */
+        private long topVer;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Topology version.
+         */
+        private AffinityAssignmentKey(String cacheName, long topVer) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof AffinityAssignmentKey))
+                return false;
+
+            AffinityAssignmentKey that = (AffinityAssignmentKey)o;
+
+            return topVer == that.topVer && F.eq(cacheName, that.cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = cacheName != null ? cacheName.hashCode() : 0;
+
+            res = 31 * res + (int)(topVer ^ (topVer >>> 32));
+
+            return res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
new file mode 100644
index 0000000..07bf390
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Affinity utility methods.
+ */
+class GridAffinityUtils {
+    /**
+     * Creates a job that will look up {@link 
org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} and {@link 
org.apache.ignite.cache.affinity.GridCacheAffinityFunction} on a
+     * cache with given name. If they exist, this job will serialize and 
transfer them together with all deployment
+     * information needed to unmarshal objects on remote node. Result is 
returned as a {@link GridTuple3},
+     * where first object is {@link GridAffinityMessage} for {@link 
org.apache.ignite.cache.affinity.GridCacheAffinityFunction}, second object
+     * is {@link GridAffinityMessage} for {@link 
org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} and third object 
is affinity assignment
+     * for given topology version.
+     *
+     * @param cacheName Cache name.
+     * @return Affinity job.
+     */
+    static Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment>> affinityJob(
+        String cacheName, long topVer) {
+        return new AffinityJob(cacheName, topVer);
+    }
+
+    /**
+     * @param ctx  {@code GridKernalContext} instance which provides 
deployment manager
+     * @param o Object for which deployment should be obtained.
+     * @return Deployment object for given instance,
+     * @throws IgniteCheckedException If node cannot create deployment for 
given object.
+     */
+    private static GridAffinityMessage affinityMessage(GridKernalContext ctx, 
Object o) throws IgniteCheckedException {
+        Class cls = o.getClass();
+
+        GridDeployment dep = ctx.deploy().deploy(cls, cls.getClassLoader());
+
+        if (dep == null)
+            throw new IgniteDeploymentException("Failed to deploy affinity 
object with class: " + cls.getName());
+
+        return new GridAffinityMessage(
+            ctx.config().getMarshaller().marshal(o),
+            cls.getName(),
+            dep.classLoaderId(),
+            dep.deployMode(),
+            dep.userVersion(),
+            dep.participants());
+    }
+
+    /**
+     * Unmarshalls transfer object from remote node within a given context.
+     *
+     * @param ctx Grid kernal context that provides deployment and marshalling 
services.
+     * @param sndNodeId {@link UUID} of the sender node.
+     * @param msg Transfer object that contains original serialized object and 
deployment information.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException If node cannot obtain deployment.
+     */
+    static Object unmarshall(GridKernalContext ctx, UUID sndNodeId, 
GridAffinityMessage msg)
+        throws IgniteCheckedException {
+        GridDeployment dep = ctx.deploy().getGlobalDeployment(
+            msg.deploymentMode(),
+            msg.sourceClassName(),
+            msg.sourceClassName(),
+            msg.userVersion(),
+            sndNodeId,
+            msg.classLoaderId(),
+            msg.loaderParticipants(),
+            null);
+
+        if (dep == null)
+            throw new IgniteDeploymentException("Failed to obtain affinity 
object (is peer class loading turned on?): " +
+                msg);
+
+        Object src = ctx.config().getMarshaller().unmarshal(msg.source(), 
dep.classLoader());
+
+        // Resource injection.
+        ctx.resource().inject(dep, dep.deployedClass(msg.sourceClassName()), 
src);
+
+        return src;
+    }
+
+    /** Ensure singleton. */
+    private GridAffinityUtils() {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    @GridInternal
+    private static class AffinityJob implements
+        Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment>>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private String cacheName;
+
+        /** */
+        private long topVer;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        private AffinityJob(@Nullable String cacheName, long topVer) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /**
+         *
+         */
+        public AffinityJob() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment> call()
+            throws Exception {
+            assert ignite != null;
+            assert log != null;
+
+            GridKernal kernal = ((GridKernal) ignite);
+
+            GridCacheContext<Object, Object> cctx = 
kernal.internalCache(cacheName).context();
+
+            assert cctx != null;
+
+            GridKernalContext ctx = kernal.context();
+
+            return F.t(
+                affinityMessage(ctx, cctx.config().getAffinity()),
+                affinityMessage(ctx, cctx.config().getAffinityMapper()),
+                new GridAffinityAssignment(topVer, 
cctx.affinity().assignments(topVer)));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            U.writeString(out, cacheName);
+            out.writeLong(topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            cacheName = U.readString(in);
+            topVer = in.readLong();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
new file mode 100644
index 0000000..718980b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Cache affinity function context implementation. Simple bean that holds all 
required fields.
+ */
+public class GridCacheAffinityFunctionContextImpl implements 
GridCacheAffinityFunctionContext {
+    /** Topology snapshot. */
+    private List<ClusterNode> topSnapshot;
+
+    /** Previous affinity assignment. */
+    private List<List<ClusterNode>> prevAssignment;
+
+    /** Discovery event that caused this topology change. */
+    private IgniteDiscoveryEvent discoEvt;
+
+    /** Topology version. */
+    private long topVer;
+
+    /** Number of backups to assign. */
+    private int backups;
+
+    /**
+     * @param topSnapshot Topology snapshot.
+     * @param topVer Topology version.
+     */
+    public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, 
List<List<ClusterNode>> prevAssignment,
+        IgniteDiscoveryEvent discoEvt, long topVer, int backups) {
+        this.topSnapshot = topSnapshot;
+        this.prevAssignment = prevAssignment;
+        this.discoEvt = discoEvt;
+        this.topVer = topVer;
+        this.backups = backups;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> previousAssignment(int part) {
+        return prevAssignment.get(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> currentTopologySnapshot() {
+        return topSnapshot;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long currentTopologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteDiscoveryEvent discoveryEvent() {
+        return discoEvt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int backups() {
+        return backups;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html
new file mode 100644
index 0000000..5bb1751
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/package.html
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
+<html>
+<body>
+    <!-- Package description. -->
+    Data affinity processor.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
new file mode 100644
index 0000000..9c34e9d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.closure;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * This enumeration defines different types of closure
+ * processing by the closure processor.
+ */
+public enum GridClosurePolicy {
+    /** Public execution pool. */
+    PUBLIC_POOL,
+
+    /** P2P execution pool. */
+    P2P_POOL,
+
+    /** System execution pool. */
+    SYSTEM_POOL,
+
+    /** GGFS pool. */
+    GGFS_POOL;
+
+    /** Enum values. */
+    private static final GridClosurePolicy[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value.
+     */
+    @Nullable public static GridClosurePolicy fromOrdinal(int ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
+}

Reply via email to