ignite-5212 Allow custom affinity function for data structures cache (cherry picked from commit f353faf)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/34a0d5f5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/34a0d5f5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/34a0d5f5 Branch: refs/heads/ignite-5398 Commit: 34a0d5f5f7c97c4f401c14b4211af35eaa34e850 Parents: b29b918 Author: Vladislav Pyatkov <[email protected]> Authored: Tue May 23 15:33:39 2017 +0300 Committer: vd-pyatkov <[email protected]> Committed: Tue May 23 16:00:16 2017 +0300 ---------------------------------------------------------------------- .../configuration/AtomicConfiguration.java | 25 +++ .../org/apache/ignite/internal/IgnitionEx.java | 1 + .../AtomicCacheAffinityConfigurationTest.java | 175 +++++++++++++++++++ .../IgniteCacheDataStructuresSelfTestSuite.java | 3 + 4 files changed, 204 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/34a0d5f5/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java index 6649b5e..169bc4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java @@ -19,6 +19,7 @@ package org.apache.ignite.configuration; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.internal.util.typedef.internal.S; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -45,6 +46,9 @@ public class AtomicConfiguration { /** Number of backups. */ private int backups = DFLT_BACKUPS; + /** Affinity function */ + private AffinityFunction aff; + /** * @return Number of backup nodes. */ @@ -98,6 +102,27 @@ public class AtomicConfiguration { this.seqReserveSize = seqReserveSize; } + /** + * Gets atomic cache affinity function. + * + * @return Affinity function or null, if not set. + */ + public AffinityFunction getAffinity() { + return aff; + } + + /** + * Sets atomic cache affinity function. + * + * @param aff Affinity function. + * @return {@code this} for chaining. + */ + public AtomicConfiguration setAffinity(AffinityFunction aff) { + this.aff = aff; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(AtomicConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/34a0d5f5/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index cead53b..85e8a79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -2324,6 +2324,7 @@ public class IgnitionEx { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); + ccfg.setAffinity(cfg.getAffinity()); ccfg.setRebalanceOrder(-1); //Prior to user caches. if (cfg.getCacheMode() == PARTITIONED) http://git-wip-us.apache.org/repos/asf/ignite/blob/34a0d5f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java new file mode 100644 index 0000000..7b7d9b5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + */ +public class AtomicCacheAffinityConfigurationTest extends GridCommonAbstractTest { + /** Affinity function. */ + private AffinityFunction affinityFunction; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setAtomicConfiguration(new AtomicConfiguration() + .setCacheMode(CacheMode.PARTITIONED) + .setAffinity(affinityFunction)); + } + + /** + * @throws Exception If failed. + * + */ + public void testRendezvousAffinity() throws Exception { + try { + affinityFunction = new RendezvousAffinityFunction(false, 10); + + startGrids(3); + + for (int i = 0; i < 3; i++) { + IgniteEx igniteEx = grid(i); + + CacheConfiguration cConf = igniteEx.context().cache().cache("ignite-atomics-sys-cache").configuration(); + + AffinityFunction aff = cConf.getAffinity(); + + assertNotNull(aff); + + assertEquals(aff.partitions(), affinityFunction.partitions()); + + assertEquals(aff.getClass(), affinityFunction.getClass()); + } + + checkAtomics(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTestAffinity() throws Exception { + try { + affinityFunction = new TestAffinityFunction("Some value"); + + startGrids(3); + + for (int i = 0; i < 3; i++) { + IgniteEx igniteEx = grid(i); + + CacheConfiguration cConf = igniteEx.context().cache().cache("ignite-atomics-sys-cache").configuration(); + + TestAffinityFunction aff = (TestAffinityFunction)cConf.getAffinity(); + + assertNotNull(aff); + + assertEquals(aff.partitions(), affinityFunction.partitions()); + + assertEquals(aff.getCustomAttribute(), ((TestAffinityFunction)affinityFunction).getCustomAttribute()); + } + + checkAtomics(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDefaultAffinity() throws Exception { + try { + affinityFunction = null; + + startGrids(3); + + for (int i = 0; i < 3; i++) { + IgniteEx igniteEx = grid(i); + + CacheConfiguration cConf = igniteEx.context().cache().cache("ignite-atomics-sys-cache").configuration(); + + assertNotNull(cConf.getAffinity()); + } + + checkAtomics(); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private void checkAtomics() { + Ignite node0 = grid(0); + + node0.atomicLong("l1", 0, true).incrementAndGet(); + node0.atomicSequence("s1", 10, true); + + for (int i = 0; i < 3; i++) { + assertEquals(1, ignite(i).atomicLong("l1", 0, false).get()); + + assertNotNull(ignite(i).atomicSequence("s1", 0, false)); + + ignite(i).atomicSequence("s1", 0, false).getAndIncrement(); + } + } + + /** + * Test affinity function. + */ + private static class TestAffinityFunction extends RendezvousAffinityFunction { + /** */ + private String customAttr; + + /** + * Default constructor. + */ + public TestAffinityFunction() { + // No-op. + } + + /** + * @param customAttr Custom attribute. + */ + TestAffinityFunction(String customAttr) { + this.customAttr = customAttr; + } + + /** + * @return Custom attribute. + */ + String getCustomAttribute() { + return customAttr; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/34a0d5f5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index d14aa4e..052787b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.AtomicCacheAffinityConfigurationTest; import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueCleanupSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueMultiNodeConsistencySelfTest; import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDataStructuresTest; @@ -188,6 +189,8 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IgnitePartitionedQueueNoBackupsTest.class)); + suite.addTestSuite(AtomicCacheAffinityConfigurationTest.class); + return suite; } }
