Merge master into ignite-3477-master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/48494478 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/48494478 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/48494478 Branch: refs/heads/master Commit: 48494478f14eb76b731a483e868d5e1bc7c58bb7 Parents: 1218c41 d298e75 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Thu Apr 13 16:30:54 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu Apr 13 16:30:54 2017 +0300 ---------------------------------------------------------------------- .../jmh/future/JmhFutureAdapterBenchmark.java | 145 +++ .../java/org/apache/ignite/IgniteCache.java | 265 ++-- .../rendezvous/RendezvousAffinityFunction.java | 283 ++-- .../ignite/internal/IgniteInternalFuture.java | 15 - .../eventstorage/GridEventStorageManager.java | 341 ++--- .../cache/GridCacheCompoundFuture.java | 63 + .../cache/GridCacheCompoundIdentityFuture.java | 63 + .../processors/cache/GridCacheFuture.java | 15 + .../cache/GridCacheFutureAdapter.java | 61 + .../GridCachePartitionExchangeManager.java | 16 + .../distributed/GridCacheTxRecoveryFuture.java | 9 +- .../dht/CacheDistributedGetFutureAdapter.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 33 +- .../distributed/dht/GridDhtTxFinishFuture.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 16 +- .../dht/GridPartitionedSingleGetFuture.java | 4 +- .../GridDhtAtomicAbstractUpdateFuture.java | 4 +- .../GridNearAtomicAbstractUpdateFuture.java | 8 +- .../GridNearAtomicSingleUpdateFuture.java | 24 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 24 +- .../colocated/GridDhtColocatedLockFuture.java | 23 +- .../GridDhtPartitionsExchangeFuture.java | 35 +- .../distributed/near/GridNearLockFuture.java | 20 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 9 +- .../GridNearPessimisticTxPrepareFuture.java | 5 +- .../near/GridNearTxFinishFuture.java | 13 +- .../cache/distributed/near/GridNearTxLocal.java | 7 +- .../near/GridNearTxPrepareFutureAdapter.java | 4 +- .../cache/local/GridLocalLockFuture.java | 6 +- .../query/GridCacheDistributedQueryFuture.java | 18 +- .../query/GridCacheQueryFutureAdapter.java | 31 +- .../transactions/IgniteTransactionsImpl.java | 4 +- .../cache/transactions/TxDeadlockDetection.java | 5 +- .../datastructures/GridCacheSemaphoreImpl.java | 12 + .../platform/compute/PlatformCompute.java | 10 - .../tcp/GridTcpMemcachedNioListener.java | 20 +- .../util/future/GridCompoundFuture.java | 45 +- .../util/future/GridFinishedFuture.java | 13 - .../internal/util/future/GridFutureAdapter.java | 479 ++++--- .../internal/util/future/IgniteFutureImpl.java | 10 - .../ignite/internal/util/lang/GridFunc.java | 1223 ++---------------- .../ignite/internal/util/lang/GridTupleV.java | 1 - .../lang/gridfunc/AlwaysFalsePredicate.java | 46 + .../util/lang/gridfunc/AlwaysTruePredicate.java | 46 + .../util/lang/gridfunc/AlwaysTrueReducer.java | 56 + .../gridfunc/AtomicIntegerFactoryCallable.java | 40 + .../gridfunc/CacheEntryGetValueClosure.java | 42 + .../gridfunc/CacheEntryHasPeekPredicate.java | 41 + .../lang/gridfunc/ClusterNodeGetIdClosure.java | 41 + .../ConcurrentDequeFactoryCallable.java | 40 + .../ConcurrentHashSetFactoryCallable.java | 40 + .../gridfunc/ConcurrentMapFactoryCallable.java | 41 + .../lang/gridfunc/ContainsNodeIdsPredicate.java | 52 + .../util/lang/gridfunc/ContainsPredicate.java | 55 + .../gridfunc/EntryByKeyEvaluationPredicate.java | 53 + .../gridfunc/EqualsClusterNodeIdPredicate.java | 51 + .../util/lang/gridfunc/EqualsUuidPredicate.java | 50 + .../lang/gridfunc/FlatCollectionWrapper.java | 66 + .../util/lang/gridfunc/FlatIterator.java | 104 ++ .../util/lang/gridfunc/HasEqualIdPredicate.java | 51 + .../lang/gridfunc/HasNotEqualIdPredicate.java | 51 + .../util/lang/gridfunc/IdentityClosure.java | 39 + .../util/lang/gridfunc/IntSumReducer.java | 51 + .../util/lang/gridfunc/IsAllPredicate.java | 52 + .../util/lang/gridfunc/IsNotAllPredicate.java | 52 + .../util/lang/gridfunc/IsNotNullPredicate.java | 44 + .../util/lang/gridfunc/LongSumReducer.java | 51 + .../util/lang/gridfunc/MapFactoryCallable.java | 41 + .../util/lang/gridfunc/MultipleIterator.java | 106 ++ .../util/lang/gridfunc/NoOpClosure.java | 39 + .../lang/gridfunc/NotContainsPredicate.java | 54 + .../util/lang/gridfunc/NotEqualPredicate.java | 53 + .../lang/gridfunc/PredicateCollectionView.java | 78 ++ .../util/lang/gridfunc/PredicateMapView.java | 121 ++ .../util/lang/gridfunc/PredicateSetView.java | 153 +++ .../lang/gridfunc/ReadOnlyCollectionView.java | 95 ++ .../lang/gridfunc/ReadOnlyCollectionView2X.java | 100 ++ .../lang/gridfunc/RunnableWrapperClosure.java | 51 + .../util/lang/gridfunc/SetFactoryCallable.java | 41 + .../util/lang/gridfunc/StringConcatReducer.java | 79 ++ .../util/lang/gridfunc/ToStringClosure.java | 42 + .../lang/gridfunc/TransformCollectionView.java | 79 ++ .../gridfunc/TransformFilteringIterator.java | 138 ++ .../util/lang/gridfunc/TransformMapView.java | 168 +++ .../util/lang/gridfunc/TransformMapView2.java | 165 +++ .../util/lang/gridfunc/package-info.java | 22 + .../org/apache/ignite/lang/IgniteFuture.java | 15 - .../TransactionDeadlockException.java | 4 +- .../transactions/TransactionException.java | 80 ++ .../TransactionHeuristicException.java | 4 +- .../TransactionOptimisticException.java | 4 +- .../TransactionRollbackException.java | 4 +- .../TransactionTimeoutException.java | 4 +- ...inityFunctionFastPowerOfTwoHashSelfTest.java | 17 - ...ndezvousAffinityFunctionSimpleBenchmark.java | 1100 ++++++++++++++++ ...ousAffinityFunctionStandardHashSelfTest.java | 17 - .../IgniteClientReconnectCacheTest.java | 16 +- .../internal/binary/BinaryEnumsSelfTest.java | 2 + .../cache/CacheKeepBinaryTransactionTest.java | 121 ++ .../GridCacheOrderedPreloadingSelfTest.java | 48 +- .../GridCachePartitionedAffinitySpreadTest.java | 169 --- ...SemaphoreFailoverSafeReleasePermitsTest.java | 9 +- ...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +- .../near/GridCacheNearTxForceKeyTest.java | 6 +- ...cheRebalancingPartitionDistributionTest.java | 2 +- .../datastreamer/DataStreamerImplSelfTest.java | 36 - ...gniteServiceConfigVariationsFullApiTest.java | 9 +- .../IgniteServiceDynamicCachesSelfTest.java | 24 +- .../util/future/IgniteFutureImplTest.java | 38 - .../ignite/testframework/GridTestNode.java | 12 +- .../testsuites/IgniteCacheTestSuite5.java | 2 + .../external/HadoopExternalTaskExecutor.java | 2 +- ...PartitionOnAffinityRunAtomicCacheOpTest.java | 46 +- .../h2/GridIndexingSpiAbstractSelfTest.java | 2 - .../Binary/BinaryBuilderSelfTest.cs | 4 +- .../Binary/BinaryDynamicRegistrationTest.cs | 11 +- .../Binary/BinarySelfTest.cs | 4 +- .../IgniteConfigurationSerializerTest.cs | 15 +- .../Binary/BinaryConfiguration.cs | 24 +- .../IgniteConfigurationSection.xsd | 8 +- .../Binary/BinarySurrogateTypeDescriptor.cs | 10 +- .../Impl/Binary/BinaryUtils.cs | 8 +- .../Impl/Binary/Marshaller.cs | 14 +- .../processors/schedule/ScheduleFutureImpl.java | 20 - 125 files changed, 5727 insertions(+), 2458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 47cd9fe,1a7c2c6..388a434 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@@ -38,7 -38,7 +38,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheLockCandidates; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; + import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 4f75480,23d7657..4faa475 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@@ -27,8 -27,8 +27,9 @@@ import org.apache.ignite.IgniteCheckedE import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; + import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 70fa1d1,964d423..6e7b324 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -42,7 -42,7 +42,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheLockCandidates; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; + import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 46fb30c,8512298..4442b3a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@@ -208,9 -202,8 +208,9 @@@ public final class GridDhtColocatedLock this.filter = filter; this.skipStore = skipStore; this.keepBinary = keepBinary; + this.recovery = recovery; - ignoreInterrupts(true); + ignoreInterrupts(); threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 103bd49,55aca2a..f6827ab --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -187,11 -171,8 +187,8 @@@ public class GridDhtPartitionsExchangeF @GridToStringInclude private volatile IgniteInternalFuture<?> partReleaseFut; - /** */ - private final Object mux = new Object(); - /** Logger. */ - private IgniteLogger log; + private final IgniteLogger log; /** Dynamic cache change requests. */ private Collection<DynamicCacheChangeRequest> reqs; @@@ -1446,10 -1216,10 +1443,10 @@@ if (updateSingleMap) { try { - updatePartitionSingleMap(msg); + updatePartitionSingleMap(node, msg); } finally { - synchronized (mux) { + synchronized (this) { assert pendingSingleUpdates > 0; pendingSingleUpdates--; http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index eb953d9,8de01c9..b94c014 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@@ -214,9 -208,8 +214,9 @@@ public final class GridNearLockFuture e this.filter = filter; this.skipStore = skipStore; this.keepBinary = keepBinary; + this.recovery = recovery; - ignoreInterrupts(true); + ignoreInterrupts(); threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java index 0000000,3e5bae9..16f8e97 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java @@@ -1,0 -1,1100 +1,1100 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.cache.affinity.rendezvous; + + import java.io.Externalizable; + import java.io.ObjectInput; + import java.io.ObjectOutput; + import java.io.Serializable; + import java.security.MessageDigest; + import java.security.NoSuchAlgorithmException; + import java.util.Collection; + import java.util.Collections; + import java.util.Comparator; + import org.apache.ignite.Ignite; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.IgniteException; + import org.apache.ignite.IgniteLogger; + import org.apache.ignite.cache.affinity.AffinityFunction; + import org.apache.ignite.cache.affinity.AffinityFunctionContext; + import org.apache.ignite.cache.affinity.AffinityNodeHashResolver; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; + import org.apache.ignite.cluster.ClusterNode; + import org.apache.ignite.configuration.CacheConfiguration; + import org.apache.ignite.configuration.IgniteConfiguration; + import org.apache.ignite.events.DiscoveryEvent; + import org.apache.ignite.events.EventType; + import org.apache.ignite.internal.IgniteNodeAttributes; + import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; + import org.apache.ignite.internal.processors.cache.GridCacheUtils; + import org.apache.ignite.internal.util.typedef.F; + import org.apache.ignite.internal.util.typedef.internal.A; + import org.apache.ignite.internal.util.typedef.internal.LT; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgniteBiPredicate; + import org.apache.ignite.lang.IgniteBiTuple; + import org.apache.ignite.resources.IgniteInstanceResource; + import org.apache.ignite.resources.LoggerResource; + import org.apache.ignite.testframework.GridTestNode; + import org.apache.ignite.testframework.GridTestUtils; + import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + + import java.io.IOException; + import java.io.PrintStream; + import java.nio.file.FileSystems; + import java.nio.file.Files; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.UUID; + import java.util.concurrent.atomic.AtomicInteger; + import org.jetbrains.annotations.NotNull; + import org.jetbrains.annotations.Nullable; + + /** + * Simple benchmarks, compatibility test and distribution check utils for affinity functions. + * Needs to check changes at the {@link RendezvousAffinityFunction}. + */ + public class RendezvousAffinityFunctionSimpleBenchmark extends GridCommonAbstractTest { + /** MAC prefix. */ + private static final String MAC_PREF = "MAC"; + + /** Ignite. */ + private static Ignite ignite; + + /** Max experiments. */ + private static final int MAX_EXPERIMENTS = 200; + + /** Max experiments. */ + private TopologyModificationMode mode = TopologyModificationMode.CHANGE_LAST_NODE; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 3600 * 1000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @param nodesCnt Count of nodes to generate. + * @return Nodes list. + */ + private List<ClusterNode> createBaseNodes(int nodesCnt) { + List<ClusterNode> nodes = new ArrayList<>(nodesCnt); + + for (int i = 0; i < nodesCnt; i++) { + GridTestNode node = new GridTestNode(UUID.randomUUID()); + + // two neighbours nodes + node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + i / 2); + + nodes.add(node); + } + return nodes; + } + + /** + * Modify the topology by remove the last / add new node. + * + * @param nodes Topology. + * @param prevAssignment Previous afinity. + * @param iter Number of iteration. + * @param backups Backups count. + * @return Affinity context. + */ + private GridAffinityFunctionContextImpl nodesModificationChangeLast(List<ClusterNode> nodes, + List<List<ClusterNode>> prevAssignment, int iter, int backups) { + DiscoveryEvent discoEvt; + + discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, nodes.size() - 1); + + return new GridAffinityFunctionContextImpl(nodes, + prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups); + } + + /** + * @param nodes Topology. + * @param idx Index of node to remove. + * @return Discovery event. + */ + @NotNull private DiscoveryEvent removeNode(List<ClusterNode> nodes, int idx) { + return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_LEFT, nodes.remove(idx)); + } + + /** + * Modify the topology by remove the first node / add new node + * + * @param nodes Topology. + * @param prevAssignment Previous affinity. + * @param iter Number of iteration. + * @param backups Backups count. + * @return Affinity context. + */ + private GridAffinityFunctionContextImpl nodesModificationChangeFirst(List<ClusterNode> nodes, + List<List<ClusterNode>> prevAssignment, int iter, int backups) { + DiscoveryEvent discoEvt; + + discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, 0); + + return new GridAffinityFunctionContextImpl(nodes, + prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups); + } + + /** + * @param nodes Topology. + * @param iter Iteration count. + * @return Discovery event. + */ + @NotNull private DiscoveryEvent addNode(List<ClusterNode> nodes, int iter) { + GridTestNode node = new GridTestNode(UUID.randomUUID()); + + // two neighbours nodes + node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + "_add_" + iter / 4); + + nodes.add(node); + + return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, node); + } + + /** + * + * @param aff Affinity function. + * @param nodes Topology. + * @param iter Number of iteration. + * @param prevAssignment Previous affinity assignment. + * @param backups Backups count. + * @return Tuple with affinity and time spend of the affinity calculation. + */ + private IgniteBiTuple<Long, List<List<ClusterNode>>> assignPartitions(AffinityFunction aff, + List<ClusterNode> nodes, List<List<ClusterNode>> prevAssignment, int backups, int iter) { + + GridAffinityFunctionContextImpl ctx = null; + switch (mode) { + case CHANGE_LAST_NODE: + ctx = nodesModificationChangeLast(nodes, prevAssignment, iter, backups); + break; + case CHANGE_FIRST_NODE: + ctx = nodesModificationChangeFirst(nodes, prevAssignment, iter, backups); + break; + + case ADD: + ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, addNode(nodes, iter), new AffinityTopologyVersion(nodes.size()), backups); + break; + + case REMOVE_RANDOM: + ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, removeNode(nodes, nodes.size() - 1), + new AffinityTopologyVersion(nodes.size()), backups); + break; + + case NONE: + ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, + new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, nodes.get(nodes.size() - 1)), + new AffinityTopologyVersion(nodes.size()), backups); + break; + + } + + long start = System.currentTimeMillis(); + + List<List<ClusterNode>> assignments = aff.assignPartitions(ctx); + + return F.t(System.currentTimeMillis() - start, assignments); + } + + /** + * @param lst List pf measures. + * @return Average of measures. + */ + private double average(Collection<Long> lst) { + if (lst.isEmpty()) + return 0; + + long sum = 0; + + for (long l : lst) + sum += l; + + return (double)sum / lst.size(); + } + + /** + * @param lst List pf measures. + * @param avg Average of the measures. + * @return Variance of the measures. + */ + private double variance(Collection<Long> lst, double avg) { + if (lst.isEmpty()) + return 0; + + long sum = 0; + + for (long l : lst) + sum += (l - avg) * (l - avg); + + return Math.sqrt((double)sum / lst.size()); + } + + /** + * The table with count of partitions on node: + * + * column 0 - primary partitions counts + * column 1 - backup#0 partitions counts + * etc + * + * Rows correspond to the nodes. + * + * @param lst Affinity result. + * @param nodes Topology. + * @return Frequency distribution: counts of partitions on node. + */ + private static List<List<Integer>> freqDistribution(List<List<ClusterNode>> lst, Collection<ClusterNode> nodes) { + List<Map<ClusterNode, AtomicInteger>> nodeMaps = new ArrayList<>(); + + int backups = lst.get(0).size(); + + for (int i = 0; i < backups; ++i) { + Map<ClusterNode, AtomicInteger> map = new HashMap<>(); + + for (List<ClusterNode> l : lst) { + ClusterNode node = l.get(i); + + if (!map.containsKey(node)) + map.put(node, new AtomicInteger(1)); + else + map.get(node).incrementAndGet(); + } + + nodeMaps.add(map); + } + + List<List<Integer>> byNodes = new ArrayList<>(nodes.size()); + for (ClusterNode node : nodes) { + List<Integer> byBackups = new ArrayList<>(backups); + + for (int j = 0; j < backups; ++j) { + if (nodeMaps.get(j).get(node) == null) + byBackups.add(0); + else + byBackups.add(nodeMaps.get(j).get(node).get()); + } + + byNodes.add(byBackups); + } + return byNodes; + } + + /** + * @param byNodes Frequency distribution. + * @param suffix Label suffix. + * @throws IOException On error. + */ + private void printDistribution(Collection<List<Integer>> byNodes, String suffix) throws IOException { + int nodes = byNodes.size(); + + try (PrintStream ps = new PrintStream(Files.newOutputStream(FileSystems.getDefault() + .getPath(String.format("%03d", nodes) + suffix)))) { + + for (List<Integer> byNode : byNodes) { + for (int w : byNode) + ps.print(String.format("%05d ", w)); + + ps.println(""); + } + } + } + + /** + * Chi-square test of the distribution with uniform distribution. + * + * @param byNodes Distribution. + * @param parts Partitions count. + * @param goldenNodeWeight Weight of according the uniform distribution. + * @return Chi-square test. + */ + private double chiSquare(List<List<Integer>> byNodes, int parts, double goldenNodeWeight) { + double sum = 0; + + for (List<Integer> byNode : byNodes) { + double w = (double)byNode.get(0) / parts; + + sum += (goldenNodeWeight - w) * (goldenNodeWeight - w) / goldenNodeWeight; + } + return sum; + } + + /** + * @throws IOException On error. + */ + public void testDistribution() throws IOException { + AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024); + + AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024); + + GridTestUtils.setFieldValue(aff1, "ignite", ignite); + + affinityDistribution(aff0, aff1); + } + + /** + * + * @param aff0 Affinity function to compare. + * @param aff1 Affinity function to compare. + */ + private void affinityDistribution(AffinityFunction aff0, AffinityFunction aff1) { + int[] nodesCnts = {5, 64, 100, 128, 200, 256, 300, 400, 500, 600}; + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes0 = createBaseNodes(nodesCnt); + List<ClusterNode> nodes1 = createBaseNodes(nodesCnt); + + assignPartitions(aff0, nodes0, null, 2, 0).get2(); + List<List<ClusterNode>> lst0 = assignPartitions(aff0, nodes0, null, 2, 1).get2(); + + assignPartitions(aff1, nodes1, null, 2, 0).get2(); + List<List<ClusterNode>> lst1 = assignPartitions(aff1, nodes1, null, 2, 1).get2(); + + List<List<Integer>> dist0 = freqDistribution(lst0, nodes0); + List<List<Integer>> dist1 = freqDistribution(lst1, nodes1); + + info(String.format("Chi^2. Test %d nodes. %s: %f; %s: %f;", + nodesCnt, + aff0.getClass().getSimpleName(), + chiSquare(dist0, aff0.partitions(), 1.0 / nodesCnt), + aff1.getClass().getSimpleName(), + chiSquare(dist1, aff0.partitions(), 1.0 / nodesCnt))); + + try { + printDistribution(dist0, "." + aff0.getClass().getSimpleName()); + printDistribution(dist1, "." + aff1.getClass().getSimpleName()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * + */ + public void testAffinityBenchmarkAdd() { + mode = TopologyModificationMode.ADD; + + AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024); + + GridTestUtils.setFieldValue(aff0, "ignite", ignite); + + affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024)); + } + + /** + * + */ + public void testAffinityBenchmarkChangeLast() { + mode = TopologyModificationMode.CHANGE_LAST_NODE; + + AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024); + + GridTestUtils.setFieldValue(aff0, "ignite", ignite); + + affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024)); + } + + /** + * @param aff0 Affinity function. to compare. + * @param aff1 Affinity function. to compare. + */ + private void affinityBenchmark(AffinityFunction aff0, AffinityFunction aff1) { + int[] nodesCnts = {100, 4, 100, 200, 300, 400, 500, 600}; + + final int backups = 2; + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes0 = createBaseNodes(nodesCnt); + List<ClusterNode> nodes1 = createBaseNodes(nodesCnt); + + List<Long> times0 = new ArrayList<>(MAX_EXPERIMENTS); + List<Long> times1 = new ArrayList<>(MAX_EXPERIMENTS); + + List<List<ClusterNode>> prevAssignment = + assignPartitions(aff0, nodes0, null, backups, 0).get2(); + + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + IgniteBiTuple<Long, List<List<ClusterNode>>> aa + = assignPartitions(aff0, nodes0, prevAssignment, backups, i); + + prevAssignment = aa.get2(); + + times0.add(aa.get1()); + } + + prevAssignment = assignPartitions(aff1, nodes1, null, backups, 0).get2(); + + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + IgniteBiTuple<Long, List<List<ClusterNode>>> aa + = assignPartitions(aff1, nodes1, prevAssignment, backups, i); + + prevAssignment = aa.get2(); + + times1.add(aa.get1()); + } + + double avr0 = average(times0); + double var0 = variance(times0, avr0); + + double avr1 = average(times1); + double var1 = variance(times1, avr1); + + info(String.format("Test %d nodes. %s: %.1f ms +/- %.3f ms; %s: %.1f ms +/- %.3f ms;", + nodesCnt, + aff0.getClass().getSimpleName(), + avr0, var0, + aff1.getClass().getSimpleName(), + avr1, var1)); + } + } + + /** + * + * @param affOld Old affinity. + * @param affNew New affinity/ + * @return Count of partitions to migrate. + */ + private int countPartitionsToMigrate(List<List<ClusterNode>> affOld, List<List<ClusterNode>> affNew) { + if (affOld == null || affNew == null) + return 0; + + assertEquals(affOld.size(), affNew.size()); + + int diff = 0; + for (int i = 0; i < affOld.size(); ++i) { + Collection<ClusterNode> s0 = new HashSet<>(affOld.get(i)); + Iterable<ClusterNode> s1 = new HashSet<>(affNew.get(i)); + + for (ClusterNode n : s1) { + if (!s0.contains(n)) + ++diff; + } + } + + return diff; + } + + /** + * + */ + public void testPartitionsMigrate() { + int[] nodesCnts = {2, 3, 10, 64, 100, 200, 300, 400, 500, 600}; + + final int backups = 2; + + AffinityFunction aff0 = new RendezvousAffinityFunction(true, 256); - AffinityFunction aff1 = new FairAffinityFunction(true, 256); ++ // TODO choose another affinity function to compare. ++ AffinityFunction aff1 = new RendezvousAffinityFunction(true, 256); + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes0 = createBaseNodes(nodesCnt); + List<ClusterNode> nodes1 = createBaseNodes(nodesCnt); + + List<List<ClusterNode>> affPrev = null; + + int diffCnt0 = 0; + + affPrev = assignPartitions(aff0, nodes0, null, backups, 0).get2(); + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + List<List<ClusterNode>> affCur = assignPartitions(aff0, nodes0, affPrev, backups, i).get2(); + diffCnt0 += countPartitionsToMigrate(affPrev, affCur); + affPrev = affCur; + } + + affPrev = assignPartitions(aff1, nodes1, null, backups, 0).get2(); + int diffCnt1 = 0; + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + List<List<ClusterNode>> affCur = assignPartitions(aff1, nodes1, affPrev, backups, i).get2(); + diffCnt1 += countPartitionsToMigrate(affPrev, affCur); + affPrev = affCur; + } + + double goldenChangeAffinity = (double)aff1.partitions() / nodesCnt * (backups + 1); + info(String.format("Test %d nodes. Golden: %.1f; %s: %.1f; %s: %.1f;", + nodesCnt, goldenChangeAffinity, + aff0.getClass().getSimpleName(), + (double)diffCnt0 / (MAX_EXPERIMENTS - 1), + aff1.getClass().getSimpleName(), + (double)diffCnt1 / (MAX_EXPERIMENTS - 1))); + } + } + + /** + * + */ + public void _testAffinityCompatibility() { + mode = TopologyModificationMode.ADD; + + AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024); + + // Use the full copy of the old implementaion of the RendezvousAffinityFunction to check the compatibility. + AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024); + GridTestUtils.setFieldValue(aff1, "ignite", ignite); + + affinityCompatibility(aff0, aff1); + } + + /** + * @param aff0 Affinity function to compare. + * @param aff1 Affinity function to compare. + */ + private void affinityCompatibility(AffinityFunction aff0, AffinityFunction aff1) { + int[] nodesCnts = {64, 100, 200, 300, 400, 500, 600}; + + final int backups = 2; + + mode = TopologyModificationMode.NONE; + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes = createBaseNodes(nodesCnt); + + List<List<ClusterNode>> assignment0 = assignPartitions(aff0, nodes, null, backups, 0).get2(); + + List<List<ClusterNode>> assignment1 = assignPartitions(aff1, nodes, null, backups, 0).get2(); + + assertEquals (assignment0, assignment1); + } + } + + /** + * + */ + private enum TopologyModificationMode { + /** Change the last node. */ + CHANGE_LAST_NODE, + + /** Change the first node. */ + CHANGE_FIRST_NODE, + + /** Add. */ + ADD, + + /** Remove random. */ + REMOVE_RANDOM, + + /** Do nothing*/ + NONE + } + + /** + * Full copy of the old implementation of the RendezvousAffinityFunction to check compatibility and performance. + */ + private static class RendezvousAffinityFunctionOld implements AffinityFunction, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Default number of partitions. */ + public static final int DFLT_PARTITION_COUNT = 1024; + + /** Comparator. */ + private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator(); + + /** Thread local message digest. */ + private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { + @Override protected MessageDigest initialValue() { + try { + return MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + assert false : "Should have failed in constructor"; + + throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e); + } + } + }; + + /** Number of partitions. */ + private int parts; + + /** Exclude neighbors flag. */ + private boolean exclNeighbors; + + /** Exclude neighbors warning. */ + private transient boolean exclNeighborsWarn; + + /** Optional backup filter. First node is primary, second node is a node being tested. */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + + /** Optional affinity backups filter. The first node is a node being tested, + * the second is a list of nodes that are already assigned for a given partition (the first node in the list + * is primary). */ + private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter; + + /** Hash ID resolver. */ + private AffinityNodeHashResolver hashIdRslvr = null; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger instance. */ + @LoggerResource + private transient IgniteLogger log; + + /** + * Empty constructor with all defaults. + */ + public RendezvousAffinityFunctionOld() { + this(false); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public RendezvousAffinityFunctionOld(boolean exclNeighbors) { + this(exclNeighbors, DFLT_PARTITION_COUNT); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts) { + this(exclNeighbors, parts, null); + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + */ + public RendezvousAffinityFunctionOld(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this(false, parts, backupFilter); + } + + /** + * Private constructor. + * + * @param exclNeighbors Exclude neighbors flag. + * @param parts Partitions count. + * @param backupFilter Backup filter. + */ + private RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts, + IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts > 0, "parts > 0"); + + this.exclNeighbors = exclNeighbors; + this.parts = parts; + this.backupFilter = backupFilter; + + try { + MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + throw new IgniteException("Failed to obtain MD5 message digest instance.", e); + } + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT); + + this.parts = parts; + } + + /** + * Gets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @return Hash ID resolver. + */ + @Deprecated + public AffinityNodeHashResolver getHashIdResolver() { + return hashIdRslvr; + } + + /** + * Sets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @param hashIdRslvr Hash ID resolver. + * + * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead. + */ + @Deprecated + public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) { + this.hashIdRslvr = hashIdRslvr; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param backupFilter Optional backup filter. + * @deprecated Use {@code affinityBackupFilter} instead. + */ + @Deprecated + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is + * the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() { + return affinityBackupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is + * the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param affinityBackupFilter Optional backup filter. + */ + public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, + List<ClusterNode>> affinityBackupFilter) { + this.affinityBackupFilter = affinityBackupFilter; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Resolves node hash. + * + * @param node Cluster node; + * @return Node hash. + */ + public Object resolveNodeHash(ClusterNode node) { + if (hashIdRslvr != null) + return hashIdRslvr.resolve(node); + else + return node.consistentId(); + } + + /** + * Returns collection of nodes (primary first) for specified partition. + * + * @param d Message digest. + * @param part Partition. + * @param nodes Nodes. + * @param nodesHash Serialized nodes hashes. + * @param backups Number of backups. + * @param neighborhoodCache Neighborhood. + * @return Assignment. + */ + public List<ClusterNode> assignPartition(MessageDigest d, + int part, + List<ClusterNode> nodes, + Map<ClusterNode, byte[]> nodesHash, + int backups, + @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { + if (nodes.size() <= 1) + return nodes; + + if (d == null) + d = digest.get(); + + List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size()); + + try { + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + byte[] nodeHashBytes = nodesHash.get(node); + + if (nodeHashBytes == null) { + Object nodeHash = resolveNodeHash(node); + + byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + + // Add 4 bytes for partition bytes. + nodeHashBytes = new byte[nodeHashBytes0.length + 4]; + + System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length); + + nodesHash.put(node, nodeHashBytes); + } + + U.intToBytes(part, nodeHashBytes, 0); + + d.reset(); + + byte[] bytes = d.digest(nodeHashBytes); + + long hash = + (bytes[0] & 0xFFL) + | ((bytes[1] & 0xFFL) << 8) + | ((bytes[2] & 0xFFL) << 16) + | ((bytes[3] & 0xFFL) << 24) + | ((bytes[4] & 0xFFL) << 32) + | ((bytes[5] & 0xFFL) << 40) + | ((bytes[6] & 0xFFL) << 48) + | ((bytes[7] & 0xFFL) << 56); + + lst.add(F.t(hash, node)); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + Collections.sort(lst, COMPARATOR); + + int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); + + List<ClusterNode> res = new ArrayList<>(primaryAndBackups); + + ClusterNode primary = lst.get(0).get2(); + + res.add(primary); + + // Select backups. + if (backups > 0) { + for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (exclNeighbors) { + Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res); + + if (!allNeighbors.contains(node)) + res.add(node); + } + else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) + res.add(next.get2()); + else if (backupFilter != null && backupFilter.apply(primary, node)) + res.add(next.get2()); + else if (affinityBackupFilter == null && backupFilter == null) + res.add(next.get2()); + } + } + + if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { + // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria. + for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (!res.contains(node)) + res.add(next.get2()); + } + + if (!exclNeighborsWarn) { + LT.warn(log, "Affinity function excludeNeighbors property is ignored " + + "because topology has no enough nodes to assign backups."); + + exclNeighborsWarn = true; + } + } + + assert res.size() <= primaryAndBackups; + + return res; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + if (key == null) + throw new IllegalArgumentException("Null key is passed for a partition calculation. " + + "Make sure that an affinity key that is used is initialized properly."); + + return U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + List<List<ClusterNode>> assignments = new ArrayList<>(parts); + + Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? + GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; + + MessageDigest d = digest.get(); + + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size()); + + for (int i = 0; i < parts; i++) { + List<ClusterNode> partAssignment = assignPartition(d, + i, + nodes, + nodesHash, + affCtx.backups(), + neighborhoodCache); + + assignments.add(partAssignment); + } + + return assignments; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(parts); + out.writeBoolean(exclNeighbors); + out.writeObject(hashIdRslvr); + out.writeObject(backupFilter); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + parts = in.readInt(); + exclNeighbors = in.readBoolean(); + hashIdRslvr = (AffinityNodeHashResolver)in.readObject(); + backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject(); + } + + /** + * + */ + private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { + return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : + o1.get2().id().compareTo(o2.get2().id()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 3e79da0,0716c20..7baea2e --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@@ -18,11 -18,7 +18,12 @@@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.GridCacheAffinityBackupsSelfTest; +import org.apache.ignite.IgniteCacheAffinitySelfTest; +import org.apache.ignite.cache.affinity.AffinityClientNodeSelfTest; +import org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest; +import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest; + import org.apache.ignite.internal.processors.cache.CacheKeepBinaryTransactionTest; import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest; import org.apache.ignite.internal.processors.cache.CacheRebalancingSelfTest; import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest; @@@ -58,10 -53,12 +59,11 @@@ public class IgniteCacheTestSuite5 exte suite.addTestSuite(IgniteCacheStoreCollectionTest.class); suite.addTestSuite(IgniteCacheWriteBehindNoUpdateSelfTest.class); suite.addTestSuite(IgniteCachePutStackOverflowSelfTest.class); - suite.addTestSuite(GridCacheSwapSpaceSpiConsistencySelfTest.class); + suite.addTestSuite(CacheKeepBinaryTransactionTest.class); suite.addTestSuite(CacheLateAffinityAssignmentTest.class); - suite.addTestSuite(CacheLateAffinityAssignmentFairAffinityTest.class); suite.addTestSuite(CacheLateAffinityAssignmentNodeJoinValidationTest.class); + suite.addTestSuite(IgniteActiveOnStartNodeJoinValidationSelfTest.class); suite.addTestSuite(EntryVersionConsistencyReadThroughTest.class); suite.addTestSuite(IgniteCacheSyncRebalanceModeSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySurrogateTypeDescriptor.cs ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs ---------------------------------------------------------------------- diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs index 61439b1,1dae576..f47cbe2 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs @@@ -497,8 -505,8 +497,8 @@@ namespace Apache.Ignite.Core.Impl.Binar var ser = GetSerializer(_cfg, null, type, typeId, null, null, _log); desc = desc == null - ? new BinaryFullTypeDescriptor(type, typeId, typeName, true, _cfg.DefaultNameMapper, - _cfg.DefaultIdMapper, ser, false, null, type.IsEnum, registered) + ? new BinaryFullTypeDescriptor(type, typeId, typeName, true, _cfg.NameMapper, - _cfg.IdMapper, ser, false, null, type.IsEnum, null, registered) ++ _cfg.IdMapper, ser, false, null, type.IsEnum, registered) : new BinaryFullTypeDescriptor(desc, type, ser, registered); if (RegistrationDisabled)