IGNITE-1481: It is possible to configure local cache with affinity function. Reviewed and merged by Denis Magda ([email protected])
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c8c246 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c8c246 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c8c246 Branch: refs/heads/ignite-db-x-10884 Commit: 96c8c246b0553e91a577ff493da46716d8e15572 Parents: 6578c8b Author: kcheng.mvp <[email protected]> Authored: Tue Apr 12 15:27:39 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Tue Apr 12 15:28:21 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 35 +++++---- .../local/LocalAffinityFunctionTest.java | 80 ++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 3 files changed, 101 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/96c8c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java old mode 100644 new mode 100755 index 38f861b..5214078 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -255,6 +255,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { aff.setHashIdResolver(new AffinityNodeAddressHashResolver()); } } + else if (cfg.getCacheMode() == LOCAL && !(cfg.getAffinity() instanceof LocalAffinityFunction)) { + cfg.setAffinity(new LocalAffinityFunction()); + + U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache" + + " [cacheName=" + U.maskName(cfg.getName()) + ']'); + } } if (cfg.getCacheMode() == REPLICATED) @@ -583,7 +589,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @SuppressWarnings( {"unchecked"}) + @SuppressWarnings({"unchecked"}) @Override public void start() throws IgniteCheckedException { DeploymentMode depMode = ctx.config().getDeploymentMode(); @@ -1252,7 +1258,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { else prepare(cfg, cfg.getCacheStoreFactory(), false); - CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; validate(ctx.config(), cfg, cacheType, cfgStore); @@ -1548,6 +1553,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } ); } + /** * Gets cache mode. * @@ -1590,8 +1596,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Starts statically configured caches received from remote nodes during exchange. * * @param topVer Topology version. - * @throws IgniteCheckedException If failed. * @return Started caches descriptors. + * @throws IgniteCheckedException If failed. */ public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer) throws IgniteCheckedException { @@ -2135,7 +2141,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { break; } - if (cfg.getName() != null ) { + if (cfg.getName() != null) { if (cfg.getName().endsWith("*")) { if (cfg.getName().length() > 1) { if (wildcardNameCfgs == null) @@ -2350,7 +2356,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { return F.first(initiateCacheChanges(F.asList(t), false)); } - /** * @param cacheName Cache name to close. * @return Future that will be completed when cache is closed. @@ -2511,7 +2516,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (msg instanceof CacheAffinityChangeMessage) return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg)); - return msg instanceof DynamicCacheChangeBatch && onCacheChangeRequested((DynamicCacheChangeBatch) msg, topVer); + return msg instanceof DynamicCacheChangeBatch && onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); } /** @@ -2623,7 +2628,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.nearCacheConfiguration() != null); } else { - if (req.failIfExists() ) { + if (req.failIfExists()) { if (fut != null) fut.onDone(new CacheExistsException("Failed to start cache " + "(a cache with the same name is already started): " + U.maskName(req.cacheName()))); @@ -2923,7 +2928,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param keyBytes Key bytes. * @param valBytes Value bytes. */ - @SuppressWarnings( {"unchecked"}) + @SuppressWarnings({"unchecked"}) public void onEvictFromSwap(String spaceName, byte[] keyBytes, byte[] valBytes) { assert spaceName != null; assert keyBytes != null; @@ -3149,8 +3154,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings({"unchecked", "ConstantConditions"}) @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted, - boolean checkThreadTx) throws IgniteCheckedException - { + boolean checkThreadTx) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Getting public cache for name: " + cacheName); @@ -3360,8 +3364,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Callback invoked by deployment manager for whenever a class loader - * gets undeployed. + * Callback invoked by deployment manager for whenever a class loader gets undeployed. * * @param ldr Class loader. */ @@ -3474,7 +3477,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param objs Extra components. * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface. */ - private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object...objs) { + private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object... objs) { Collection<Object> ret = new ArrayList<>(7 + objs.length); ret.add(ccfg.getAffinity()); @@ -3503,8 +3506,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param val Object to check. - * @throws IgniteCheckedException If validation failed. * @return Configuration copy. + * @throws IgniteCheckedException If validation failed. */ private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException { if (val == null) @@ -3541,8 +3544,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param c Closure. - * @throws IgniteCheckedException If failed. * @return Closure result. + * @throws IgniteCheckedException If failed. */ private <T> T withBinaryContext(IgniteOutClosureX<T> c) throws IgniteCheckedException { IgniteCacheObjectProcessor objProc = ctx.cacheObjects(); @@ -3574,7 +3577,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return marshaller.unmarshal(marshaller.marshal(obj), U.resolveClassLoader(ctx.config())); } }); - }; + } /** * @param name Name to mask. http://git-wip-us.apache.org/repos/asf/ignite/blob/96c8c246/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java new file mode 100755 index 0000000..abd15ea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/local/LocalAffinityFunctionTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.local; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for local affinity function. + */ +public class LocalAffinityFunctionTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODE_CNT = 1; + + /** */ + private static final String CACHE1 = "cache1"; + + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setBackups(1); + ccfg.setName(CACHE1); + ccfg.setCacheMode(CacheMode.LOCAL); + ccfg.setAffinity(new FairAffinityFunction()); + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + @Override + protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + startGrids(NODE_CNT); + } + + @Override + protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + stopAllGrids(); + } + + public void testWronglySetAffinityFunctionForLocalCache() { + Ignite node = ignite(NODE_CNT - 1); + + CacheConfiguration ccf = node.cache(CACHE1).getConfiguration(CacheConfiguration.class); + + assertEquals("org.apache.ignite.internal.processors.cache.GridCacheProcessor$LocalAffinityFunction", + ccf.getAffinity().getClass().getName()); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c8c246/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java old mode 100644 new mode 100755 index 0fdf817..7edd0a0 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -27,6 +27,7 @@ import org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest; import org.apache.ignite.cache.affinity.fair.FairAffinityDynamicCacheSelfTest; import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionNodesSelfTest; import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionSelfTest; +import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest; import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest; import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest; import org.apache.ignite.cache.store.StoreResourceInjectionSelfTest; @@ -211,6 +212,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class); suite.addTestSuite(IgniteCacheAffinitySelfTest.class); suite.addTestSuite(AffinityClientNodeSelfTest.class); + suite.addTestSuite(LocalAffinityFunctionTest.class); suite.addTestSuite(AffinityHistoryCleanupTest.class); // Swap tests.
