Repository: ignite Updated Branches: refs/heads/master e5a467272 -> e1f8f46f9
IGNITE-8006 Parallelize cache groups start - Fixes #4752. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1f8f46f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1f8f46f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1f8f46f Branch: refs/heads/master Commit: e1f8f46f90868d377bc764b74d07812150218c71 Parents: e5a4672 Author: Anton Kalashnikov <[email protected]> Authored: Mon Oct 22 16:27:49 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Oct 22 16:27:49 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 7 + .../cache/CacheAffinitySharedManager.java | 115 ++-- .../processors/cache/GridCacheIoManager.java | 22 +- .../GridCachePartitionExchangeManager.java | 4 +- .../processors/cache/GridCacheProcessor.java | 523 ++++++++++++++----- .../processors/cache/StartCacheInfo.java | 113 ++++ .../GridDhtPartitionsExchangeFuture.java | 78 ++- .../ignite/internal/util/IgniteUtils.java | 104 ++-- .../internal/util/InitializationProtector.java | 79 +++ .../util/lang/IgniteThrowableConsumer.java | 37 ++ .../util/lang/IgniteThrowableRunner.java | 30 ++ .../distributed/CacheStartInParallelTest.java | 219 ++++++++ .../IgniteCrossCacheTxStoreSelfTest.java | 44 +- .../internal/util/IgniteUtilsSelfTest.java | 74 +++ .../testsuites/IgniteCacheTestSuite7.java | 3 + 15 files changed, 1165 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 521222c..6afe244 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1030,6 +1030,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_REBALANCE_THROTTLE_OVERRIDE = "IGNITE_REBALANCE_THROTTLE_OVERRIDE"; /** + * Enables start caches in parallel. + * + * Default is {@code true}. + */ + public static final String IGNITE_ALLOW_START_CACHES_IN_PARALLEL = "IGNITE_ALLOW_START_CACHES_IN_PARALLEL"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index cedbde1..6e10c00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -17,12 +17,14 @@ package org.apache.ignite.internal.processors.cache; +import javax.cache.CacheException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,7 +32,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; @@ -427,25 +428,43 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size()); - Set<String> startedCaches = U.newHashSet(startDescs.size()); - Map<Integer, Boolean> startedInfos = U.newHashMap(startDescs.size()); - for (DynamicCacheDescriptor desc : startDescs) { - try { - startedCaches.add(desc.cacheName()); + List<StartCacheInfo> startCacheInfos = startDescs.stream() + .map(desc -> { + DynamicCacheChangeRequest changeReq = startReqs.get(desc.cacheName()); - DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName()); - - cctx.cache().prepareCacheStart( - desc.cacheConfiguration(), + return new StartCacheInfo( desc, - startReq.nearCacheConfiguration(), + changeReq.nearCacheConfiguration(), topVer, - startReq.disabledAfterStart() + changeReq.disabledAfterStart() ); + }) + .collect(Collectors.toList()); + + Set<String> startedCaches = startCacheInfos.stream() + .map(info -> info.getCacheDescriptor().cacheName()) + .collect(Collectors.toSet()); + + try { + cctx.cache().prepareStartCaches(startCacheInfos); + } + catch (IgniteCheckedException e) { + cctx.cache().closeCaches(startedCaches, false); + + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e); + + return null; + } + + for (StartCacheInfo startCacheInfo : startCacheInfos) { + try { + DynamicCacheDescriptor desc = startCacheInfo.getCacheDescriptor(); + + startedCaches.add(desc.cacheName()); - startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null); + startedInfos.put(desc.cacheId(), startCacheInfo.getReqNearCfg() != null); CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); @@ -860,6 +879,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap long time = System.currentTimeMillis(); + Map<StartCacheInfo, DynamicCacheChangeRequest> startCacheInfos = new LinkedHashMap<>(); + for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) { DynamicCacheDescriptor cacheDesc = action.descriptor(); @@ -895,29 +916,41 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } - try { - if (startCache) { - cctx.cache().prepareCacheStart( + if (startCache) { + startCacheInfos.put( + new StartCacheInfo( req.startCacheConfiguration(), cacheDesc, nearCfg, evts.topologyVersion(), req.disabledAfterStart() - ); - - if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) { - if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) - U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); - } - } + ), + req + ); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " + - "[cacheName=" + req.cacheName() + ']', e); + } + + Map<StartCacheInfo, IgniteCheckedException> failedCaches = cctx.cache().prepareStartCachesIfPossible(startCacheInfos.keySet()); - cctx.cache().closeCaches(Collections.singleton(req.cacheName()), false); + failedCaches.forEach((cacheInfo, exception) -> { + U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " + + "[cacheName=" + cacheInfo.getStartedConfiguration().getName() + ']', exception); - cctx.cache().completeCacheStartFuture(req, false, e); + cctx.cache().closeCaches(Collections.singleton(cacheInfo.getStartedConfiguration().getName()), false); + + cctx.cache().completeCacheStartFuture(startCacheInfos.get(cacheInfo), false, exception); + }); + + Set<StartCacheInfo> failedCacheInfos = failedCaches.keySet(); + + List<StartCacheInfo> cacheInfos = startCacheInfos.keySet().stream() + .filter(failedCacheInfos::contains) + .collect(Collectors.toList()); + + for (StartCacheInfo info : cacheInfos) { + if (fut.cacheAddedOnExchange(info.getCacheDescriptor().cacheId(), info.getCacheDescriptor().receivedFrom())) { + if (fut.events().discoveryCache().cacheGroupAffinityNodes(info.getCacheDescriptor().groupId()).isEmpty()) + U.quietAndWarn(log, "No server nodes found for cache client: " + info.getCacheDescriptor().cacheName()); } } @@ -952,22 +985,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap U.doInParallel( cctx.kernalContext().getSystemExecutorService(), startedGroups, - new IgniteInClosureX<CacheGroupDescriptor>() { - @Override public void applyx(CacheGroupDescriptor grpDesc) throws IgniteCheckedException { - if (crd) - initStartedGroupOnCoordinator(fut, grpDesc); - else { - CacheGroupContext grp = cctx.cache().cacheGroup(grpDesc.groupId()); + grpDesc -> { + if (crd) + initStartedGroupOnCoordinator(fut, grpDesc); + else { + CacheGroupContext grp = cctx.cache().cacheGroup(grpDesc.groupId()); - if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) { - assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); + if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) { + assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); - initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut); - } + initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut); } } - }, - null); + } + ); } /** @@ -1228,7 +1259,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap .collect(Collectors.toList()); try { - U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c, null); + U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to execute affinity operation on cache groups", e); @@ -1255,7 +1286,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } try { - U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c, null); + U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to execute affinity operation on cache groups", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 2e66e5b..3116d31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1340,22 +1340,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (msgIdx != -1) { Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId); + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> { + if (clsHandlers == null) + clsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; - if (cacheClsHandlers == null) { - cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; + if(clsHandlers[msgIdx] != null) + return null; - idxClsHandlers0.put(hndId, cacheClsHandlers); - } + clsHandlers[msgIdx] = c; + + return clsHandlers; + }); - if (cacheClsHandlers[msgIdx] != null) + if (cacheClsHandlers == null) throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId + ", type=" + type + ']'); - cacheClsHandlers[msgIdx] = c; - - msgHandlers.idxClsHandlers = idxClsHandlers0; - return; } else { @@ -1572,7 +1572,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ static class MessageHandlers { /** Indexed class handlers. */ - volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new ConcurrentHashMap<>(); /** Handler registry. */ ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 8b8efa3..b0e0d0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -109,6 +109,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -2622,7 +2623,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana err = e; } catch (Throwable e) { - err = e; + if (!(stop && X.hasCause(e, IgniteInterruptedCheckedException.class))) + err = e; } finally { if (err == null && !stop && !reconnectNeeded) http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4a6bed4..59703c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import javax.management.MBeanServer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -68,6 +69,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgniteTransactionsEx; import org.apache.ignite.internal.binary.BinaryContext; @@ -136,15 +138,16 @@ import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.F0; +import org.apache.ignite.internal.util.InitializationProtector; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainClosure; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -193,6 +196,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_C import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache; +import static org.apache.ignite.internal.util.IgniteUtils.doInParallel; /** * Cache processor. @@ -214,6 +218,10 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor private final boolean walFsyncWithDedicatedWorker = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, false); + /** Enables start caches in parallel. */ + private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true); + /** Shared cache context. */ private GridCacheSharedContext<?, ?> sharedCtx; @@ -266,6 +274,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor /** MBean group for cache group metrics */ private final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups"; + /** Protector of initialization of specific value. */ + private final InitializationProtector initializationProtector = new InitializationProtector(); + /** * @param ctx Kernal context. */ @@ -1285,71 +1296,6 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** - * @param cache Cache to start. - * @param schema Cache schema. - * @throws IgniteCheckedException If failed to start cache. - */ - @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) - private void startCache(GridCacheAdapter<?, ?> cache, QuerySchema schema) throws IgniteCheckedException { - GridCacheContext<?, ?> cacheCtx = cache.context(); - - CacheConfiguration cfg = cacheCtx.config(); - - // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set. - if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY - && !(ctx.config().getMarshaller() instanceof BinaryMarshaller)) - U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " + - "BinaryMarshaller is not used"); - - // Start managers. - for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) - mgr.start(cacheCtx); - - cacheCtx.initConflictResolver(); - - if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { - GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); - - // Start DHT managers. - for (GridCacheManager mgr : dhtManagers(dhtCtx)) - mgr.start(dhtCtx); - - dhtCtx.initConflictResolver(); - - // Start DHT cache. - dhtCtx.cache().start(); - - if (log.isDebugEnabled()) - log.debug("Started DHT cache: " + dhtCtx.cache().name()); - } - - ctx.continuous().onCacheStart(cacheCtx); - - cacheCtx.cache().start(); - - ctx.query().onCacheStart(cacheCtx, schema); - - cacheCtx.onStarted(); - - String memPlcName = cfg.getDataRegionName(); - - if (memPlcName == null && ctx.config().getDataStorageConfiguration() != null) - memPlcName = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName(); - - if (log.isInfoEnabled()) { - log.info("Started cache [name=" + cfg.getName() + - ", id=" + cacheCtx.cacheId() + - (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + - ", memoryPolicyName=" + memPlcName + - ", mode=" + cfg.getCacheMode() + - ", atomicity=" + cfg.getAtomicityMode() + - ", backups=" + cfg.getBackups() + - ", mvcc=" + cacheCtx.mvccEnabled() +']' + - ", encryptionEnabled=" + cfg.isEncryptionEnabled() +']'); - } - } - - /** * @param cache Cache to stop. * @param cancel Cancel flag. * @param destroy Destroy data flag. Setting to <code>true</code> will remove all cache data. @@ -1600,7 +1546,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class); CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class); - storeMgr.initialize(cfgStore, sesHolders); + if (cfgStore == null) + storeMgr.initialize(cfgStore, sesHolders); + else + initializationProtector.protect( + cfgStore, + () -> storeMgr.initialize(cfgStore, sesHolders) + ); GridCacheContext<?, ?> cacheCtx = new GridCacheContext( ctx, @@ -2017,18 +1969,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor IgniteInternalFuture<?> res = sharedCtx.affinity().initCachesOnLocalJoin( locJoinCtx.cacheGroupDescriptors(), locJoinCtx.cacheDescriptors()); - for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : locJoinCtx.caches()) { - DynamicCacheDescriptor desc = t.get1(); + List<StartCacheInfo> startCacheInfos = locJoinCtx.caches().stream() + .map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false)) + .collect(Collectors.toList()); - prepareCacheStart( - desc.cacheConfiguration(), - desc, - t.get2(), - exchTopVer, - false); + prepareStartCaches(startCacheInfos); - context().exchange().exchangerUpdateHeartbeat(); - } + context().exchange().exchangerUpdateHeartbeat(); if (log.isInfoEnabled()) log.info("Starting caches on local join performed in " + (System.currentTimeMillis() - time) + " ms."); @@ -2054,24 +2001,164 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor */ public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { - List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId); + List<DynamicCacheDescriptor> receivedCaches = cachesInfo.cachesReceivedFromJoin(nodeId); - for (DynamicCacheDescriptor desc : started) { - IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter(); + List<StartCacheInfo> startCacheInfos = receivedCaches.stream() + .filter(desc -> isLocalAffinity(desc.groupDescriptor().config())) + .map(desc -> new StartCacheInfo(desc, null, exchTopVer, false)) + .collect(Collectors.toList()); - if (CU.affinityNode(ctx.discovery().localNode(), filter)) { - prepareCacheStart( - desc.cacheConfiguration(), - desc, - null, - exchTopVer, - false); + prepareStartCaches(startCacheInfos); + + return receivedCaches; + } + + /** + * @param cacheConfiguration Checked configuration. + * @return {@code true} if local node is affinity node for cache. + */ + private boolean isLocalAffinity(CacheConfiguration cacheConfiguration) { + return CU.affinityNode(ctx.discovery().localNode(), cacheConfiguration.getNodeFilter()); + } + + /** + * Start all input caches in parallel. + * + * @param startCacheInfos All caches information for start. + */ + void prepareStartCaches(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException { + prepareStartCaches(startCacheInfos, (data, operation) -> { + operation.accept(data);// PROXY + }); + } + + /** + * Trying to start all input caches in parallel and skip failed caches. + * + * @param startCacheInfos Caches info for start. + * @return Caches which was failed. + * @throws IgniteCheckedException if failed. + */ + Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException { + HashMap<StartCacheInfo, IgniteCheckedException> failedCaches = new HashMap<>(); + + prepareStartCaches(startCacheInfos, (data, operation) -> { + try { + operation.accept(data); + } + catch (IgniteInterruptedCheckedException e) { + throw e; + } + catch (IgniteCheckedException e) { + log.warning("Cache can not be started : cache=" + data.getStartedConfiguration().getName()); + + failedCaches.put(data, e); + } + }); + + return failedCaches; + } + + /** + * Start all input caches in parallel. + * + * @param startCacheInfos All caches information for start. + * @param cacheStartFailHandler Fail handler for one cache start. + */ + private void prepareStartCaches( + Collection<StartCacheInfo> startCacheInfos, + StartCacheFailHandler<StartCacheInfo> cacheStartFailHandler + ) throws IgniteCheckedException { + if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() <= 1) { + for (StartCacheInfo startCacheInfo : startCacheInfos) { + cacheStartFailHandler.handle( + startCacheInfo, + cacheInfo -> prepareCacheStart( + cacheInfo.getCacheDescriptor().cacheConfiguration(), + cacheInfo.getCacheDescriptor(), + cacheInfo.getReqNearCfg(), + cacheInfo.getExchangeTopVer(), + cacheInfo.isDisabledAfterStart() + ) + ); context().exchange().exchangerUpdateHeartbeat(); } } + else { + Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>(); + + int parallelismLvl = sharedCtx.kernalContext().config().getSystemThreadPoolSize(); + + // Reserve at least 2 threads for system operations. + parallelismLvl = Math.max(1, parallelismLvl - 2); + + doInParallel( + parallelismLvl, + sharedCtx.kernalContext().getSystemExecutorService(), + startCacheInfos, + startCacheInfo -> + cacheStartFailHandler.handle( + startCacheInfo, + cacheInfo -> { + GridCacheContext cacheCtx = prepareCacheContext( + cacheInfo.getCacheDescriptor().cacheConfiguration(), + cacheInfo.getCacheDescriptor(), + cacheInfo.getReqNearCfg(), + cacheInfo.getExchangeTopVer(), + cacheInfo.isDisabledAfterStart() + ); + cacheContexts.put(cacheInfo, cacheCtx); + + context().exchange().exchangerUpdateHeartbeat(); + } + ) + ); - return started; + /* + * This hack required because we can't start sql schema in parallel by folowing reasons: + * * checking index to duplicate(and other checking) require one order on every nodes. + * * onCacheStart and createSchema contains a lot of mutex. + * + * TODO IGNITE-9729 + */ + Set<StartCacheInfo> successfullyPreparedCaches = cacheContexts.keySet(); + + List<StartCacheInfo> cacheInfosInOriginalOrder = startCacheInfos.stream() + .filter(successfullyPreparedCaches::contains) + .collect(Collectors.toList()); + + for (StartCacheInfo startCacheInfo : cacheInfosInOriginalOrder) { + cacheStartFailHandler.handle( + startCacheInfo, + cacheInfo -> { + ctx.query().onCacheStart( + cacheContexts.get(cacheInfo), + cacheInfo.getCacheDescriptor().schema() != null + ? cacheInfo.getCacheDescriptor().schema() + : new QuerySchema() + ); + + context().exchange().exchangerUpdateHeartbeat(); + } + ); + } + + doInParallel( + parallelismLvl, + sharedCtx.kernalContext().getSystemExecutorService(), + cacheContexts.entrySet(), + cacheCtxEntry -> + cacheStartFailHandler.handle( + cacheCtxEntry.getKey(), + cacheInfo -> { + onCacheStarted(cacheCtxEntry.getValue()); + + context().exchange().exchangerUpdateHeartbeat(); + } + ) + ); + } } /** @@ -2090,71 +2177,156 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor AffinityTopologyVersion exchTopVer, boolean disabledAfterStart ) throws IgniteCheckedException { + GridCacheContext cacheCtx = prepareCacheContext(startCfg, desc, reqNearCfg, exchTopVer, disabledAfterStart); + + ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema()); + + onCacheStarted(cacheCtx); + } + + /** + * Preparing cache context to start. + * + * @param startCfg Cache configuration to use. + * @param desc Cache descriptor. + * @param reqNearCfg Near configuration if specified for client cache start request. + * @param exchTopVer Current exchange version. + * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change + * state of proxies to restarting + * @return Created {@link GridCacheContext}. + * @throws IgniteCheckedException if failed. + */ + private GridCacheContext prepareCacheContext( + CacheConfiguration startCfg, + DynamicCacheDescriptor desc, + @Nullable NearCacheConfiguration reqNearCfg, + AffinityTopologyVersion exchTopVer, + boolean disabledAfterStart + ) throws IgniteCheckedException { assert !caches.containsKey(startCfg.getName()) : startCfg.getName(); CacheConfiguration ccfg = new CacheConfiguration(startCfg); CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - boolean affNode; + boolean affNode = checkForAffinityNode(desc, reqNearCfg, ccfg); - if (ccfg.getCacheMode() == LOCAL) { - affNode = true; + preparePageStore(desc, affNode); + + CacheGroupContext grp = prepareCacheGroup(desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName()); + + GridCacheContext cacheCtx = createCache(ccfg, + grp, + null, + desc, + exchTopVer, + cacheObjCtx, + affNode, + true, + disabledAfterStart + ); + initCacheContext(cacheCtx, ccfg, desc.deploymentId()); + + return cacheCtx; + } + + /** + * Check for affinity node and customize near configuration if needed. + * + * @param desc Cache descriptor. + * @param reqNearCfg Near configuration if specified for client cache start request. + * @param ccfg Cache configuration to use. + * @return {@code true} if it is affinity node for cache. + */ + private boolean checkForAffinityNode( + DynamicCacheDescriptor desc, + @Nullable NearCacheConfiguration reqNearCfg, + CacheConfiguration ccfg + ) { + if (ccfg.getCacheMode() == LOCAL) { ccfg.setNearConfiguration(null); - } - else if (CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) - affNode = true; - else { - affNode = false; - ccfg.setNearConfiguration(reqNearCfg); + return true; } - if (sharedCtx.pageStore() != null && affNode) - sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData()); - - String grpName = startCfg.getGroupName(); + if (isLocalAffinity(desc.groupDescriptor().config())) + return true; - CacheGroupContext grp = null; + ccfg.setNearConfiguration(reqNearCfg); - if (grpName != null) { - for (CacheGroupContext grp0 : cacheGrps.values()) { - if (grp0.sharedGroup() && grpName.equals(grp0.name())) { - grp = grp0; + return false; + } - break; - } - } + /** + * Prepare page store for start cache. + * + * @param desc Cache descriptor. + * @param affNode {@code true} if it is affinity node for cache. + * @throws IgniteCheckedException if failed. + */ + private void preparePageStore(DynamicCacheDescriptor desc, boolean affNode) throws IgniteCheckedException { + if (sharedCtx.pageStore() != null && affNode) + initializationProtector.protect( + desc.groupDescriptor().groupId(), + () -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData()) + ); + } - if (grp == null) { - grp = startCacheGroup(desc.groupDescriptor(), + /** + * Prepare cache group to start cache. + * + * @param desc Cache descriptor. + * @param exchTopVer Current exchange version. + * @param cacheObjCtx Cache object context. + * @param affNode {@code true} if it is affinity node for cache. + * @param grpName Group name. + * @return Prepared cache group context. + * @throws IgniteCheckedException if failed. + */ + private CacheGroupContext prepareCacheGroup( + DynamicCacheDescriptor desc, + AffinityTopologyVersion exchTopVer, + CacheObjectContext cacheObjCtx, + boolean affNode, + String grpName + ) throws IgniteCheckedException { + if (grpName != null) { + return initializationProtector.protect( + desc.groupId(), + () -> findCacheGroup(grpName), + () -> startCacheGroup( + desc.groupDescriptor(), desc.cacheType(), affNode, cacheObjCtx, - exchTopVer); - } - } - else { - grp = startCacheGroup(desc.groupDescriptor(), - desc.cacheType(), - affNode, - cacheObjCtx, - exchTopVer); + exchTopVer + ) + ); } - GridCacheContext cacheCtx = createCache(ccfg, - grp, - null, - desc, - exchTopVer, - cacheObjCtx, + return startCacheGroup(desc.groupDescriptor(), + desc.cacheType(), affNode, - true, - disabledAfterStart + cacheObjCtx, + exchTopVer ); + } - cacheCtx.dynamicDeploymentId(desc.deploymentId()); + /** + * Initialize created cache context. + * + * @param cacheCtx Cache context to initializtion. + * @param cfg Cache configuration. + * @param deploymentId Dynamic deployment ID. + * @throws IgniteCheckedException if failed. + */ + private void initCacheContext( + GridCacheContext<?, ?> cacheCtx, + CacheConfiguration cfg, + IgniteUuid deploymentId + ) throws IgniteCheckedException { + cacheCtx.dynamicDeploymentId(deploymentId); GridCacheAdapter cache = cacheCtx.cache(); @@ -2162,7 +2334,67 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor caches.put(cacheCtx.name(), cache); - startCache(cache, desc.schema() != null ? desc.schema() : new QuerySchema()); + // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set. + if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY + && !(ctx.config().getMarshaller() instanceof BinaryMarshaller)) + U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " + + "BinaryMarshaller is not used"); + + // Start managers. + for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) + mgr.start(cacheCtx); + + cacheCtx.initConflictResolver(); + + if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { + GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); + + // Start DHT managers. + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.start(dhtCtx); + + dhtCtx.initConflictResolver(); + + // Start DHT cache. + dhtCtx.cache().start(); + + if (log.isDebugEnabled()) + log.debug("Started DHT cache: " + dhtCtx.cache().name()); + } + + ctx.continuous().onCacheStart(cacheCtx); + + cacheCtx.cache().start(); + } + + /** + * Handle of cache context which was fully prepared. + * + * @param cacheCtx Fully prepared context. + * @throws IgniteCheckedException if failed. + */ + private void onCacheStarted(GridCacheContext cacheCtx) throws IgniteCheckedException { + GridCacheAdapter cache = cacheCtx.cache(); + CacheConfiguration cfg = cacheCtx.config(); + CacheGroupContext grp = cacheGrps.get(cacheCtx.groupId()); + + cacheCtx.onStarted(); + + String dataRegion = cfg.getDataRegionName(); + + if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null) + dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName(); + + if (log.isInfoEnabled()) { + log.info("Started cache [name=" + cfg.getName() + + ", id=" + cacheCtx.cacheId() + + (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + + ", dataRegionName=" + dataRegion + + ", mode=" + cfg.getCacheMode() + + ", atomicity=" + cfg.getAtomicityMode() + + ", backups=" + cfg.getBackups() + + ", mvcc=" + cacheCtx.mvccEnabled() + ']'); + } grp.onCacheStarted(cacheCtx); @@ -2170,6 +2402,17 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** + * @param grpName Group name. + * @return Found group or null. + */ + private CacheGroupContext findCacheGroup(String grpName) { + return cacheGrps.values().stream() + .filter(grp -> grp.sharedGroup() && grpName.equals(grp.name())) + .findAny() + .orElse(null); + } + + /** * Restarts proxies of caches if they was marked as restarting. Requires external synchronization - shouldn't be * called concurrently with another caches restart. */ @@ -4839,7 +5082,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor // Check if we were asked to start a near cache. if (nearCfg != null) { - if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) { + if (isLocalAffinity(descCfg)) { // If we are on a data node and near cache was enabled, return success, else - fail. if (descCfg.getNearConfiguration() != null) return null; @@ -4851,7 +5094,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor // If local node has near cache, return success. req.clientStartOnly(true); } - else if (!CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) + else if (!isLocalAffinity(descCfg)) req.clientStartOnly(true); req.deploymentId(desc.deploymentId()); @@ -5020,6 +5263,22 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** + * Handle of fail during cache start. + * + * @param <T> Type of started data. + */ + private static interface StartCacheFailHandler<T> { + /** + * Handle of fail. + * + * @param data Start data. + * @param startCacheOperation Operation for start cache. + * @throws IgniteCheckedException if failed. + */ + void handle(T data, IgniteThrowableConsumer<T> startCacheOperation) throws IgniteCheckedException; + } + + /** * */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java new file mode 100644 index 0000000..a5aea26 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Specific cache information for start. + */ +public class StartCacheInfo { + /** Cache configuration for start. */ + private final CacheConfiguration startedConf; + + /** Cache descriptor for start. */ + private final DynamicCacheDescriptor desc; + + /** Near cache configuration for start. */ + private final @Nullable NearCacheConfiguration reqNearCfg; + + /** Exchange topology version in which starting happened. */ + private final AffinityTopologyVersion exchTopVer; + + /** Disable started cache after start or not. */ + private final boolean disabledAfterStart; + + /** + * @param desc Cache configuration for start. + * @param reqNearCfg Near cache configuration for start. + * @param exchTopVer Exchange topology version in which starting happened. + * @param disabledAfterStart Disable started cache after start or not. + */ + public StartCacheInfo(DynamicCacheDescriptor desc, + NearCacheConfiguration reqNearCfg, + AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) { + this(desc.cacheConfiguration(), desc, reqNearCfg, exchTopVer, disabledAfterStart); + } + + /** + * @param conf Cache configuration for start. + * @param desc Cache descriptor for start. + * @param reqNearCfg Near cache configuration for start. + * @param exchTopVer Exchange topology version in which starting happened. + * @param disabledAfterStart Disable started cache after start or not. + */ + public StartCacheInfo(CacheConfiguration conf, DynamicCacheDescriptor desc, + NearCacheConfiguration reqNearCfg, + AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) { + startedConf = conf; + this.desc = desc; + this.reqNearCfg = reqNearCfg; + this.exchTopVer = exchTopVer; + this.disabledAfterStart = disabledAfterStart; + } + + /** + * @return Cache configuration for start. + */ + public CacheConfiguration getStartedConfiguration() { + return startedConf; + } + + /** + * @return Cache descriptor for start. + */ + public DynamicCacheDescriptor getCacheDescriptor() { + return desc; + } + + /** + * @return Near cache configuration for start. + */ + @Nullable public NearCacheConfiguration getReqNearCfg() { + return reqNearCfg; + } + + /** + * @return Exchange topology version in which starting happened. + */ + public AffinityTopologyVersion getExchangeTopVer() { + return exchTopVer; + } + + /** + * @return Disable started cache after start or not. + */ + public boolean isDisabledAfterStart() { + return disabledAfterStart; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartCacheInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d494857..0c2cbe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3512,36 +3512,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte U.doInParallel( cctx.kernalContext().getSystemExecutorService(), nonLocalCacheGroupDescriptors(), - new IgniteInClosureX<CacheGroupDescriptor>() { - @Override public void applyx(CacheGroupDescriptor grpDesc) { - CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId()); - - GridDhtPartitionTopology top = grpCtx != null - ? grpCtx.topology() - : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache()); - - // Do not validate read or write through caches or caches with disabled rebalance - // or ExpiryPolicy is set or validation is disabled. - if (grpCtx == null - || grpCtx.config().isReadThrough() - || grpCtx.config().isWriteThrough() - || grpCtx.config().getCacheStoreFactory() != null - || grpCtx.config().getRebalanceDelay() == -1 - || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE - || grpCtx.config().getExpiryPolicyFactory() == null - || SKIP_PARTITION_SIZE_VALIDATION) - return; + grpDesc -> { + CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId()); + + GridDhtPartitionTopology top = grpCtx != null + ? grpCtx.topology() + : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache()); + + // Do not validate read or write through caches or caches with disabled rebalance + // or ExpiryPolicy is set or validation is disabled. + if (grpCtx == null + || grpCtx.config().isReadThrough() + || grpCtx.config().isWriteThrough() + || grpCtx.config().getCacheStoreFactory() != null + || grpCtx.config().getRebalanceDelay() == -1 + || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE + || grpCtx.config().getExpiryPolicyFactory() == null + || SKIP_PARTITION_SIZE_VALIDATION) + return; - try { - validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs); - } - catch (IgniteCheckedException ex) { - log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage()); - // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833 - } + try { + validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs); } - }, - null); + catch (IgniteCheckedException ex) { + log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage()); + // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833 + } + } + ); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to validate partitions state", e); @@ -3561,21 +3559,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte U.doInParallel( cctx.kernalContext().getSystemExecutorService(), nonLocalCacheGroupDescriptors(), - new IgniteInClosureX<CacheGroupDescriptor>() { - @Override public void applyx(CacheGroupDescriptor grpDesc) { - CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId()); + grpDesc -> { + CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId()); - GridDhtPartitionTopology top = grpCtx != null - ? grpCtx.topology() - : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache()); + GridDhtPartitionTopology top = grpCtx != null + ? grpCtx.topology() + : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache()); - if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) - assignPartitionSizes(top); - else - assignPartitionStates(top); - } - }, - null); + if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) + assignPartitionSizes(top); + else + assignPartitionStates(top); + } + ); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to assign partition states", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e6f374a..2fe0eb8 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -142,6 +142,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; import java.util.zip.ZipInputStream; @@ -216,19 +217,16 @@ import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; import org.apache.ignite.internal.util.lang.GridTuple; -import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFutureCancelledException; @@ -236,6 +234,7 @@ import org.apache.ignite.lang.IgniteFutureTimeoutException; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.marshaller.Marshaller; @@ -10739,54 +10738,91 @@ public abstract class IgniteUtils { } /** + * Execute operation on data in parallel. + * * @param executorSvc Service for parallel execution. * @param srcDatas List of data for parallelization. - * @param consumer Logic for execution of on each item of data. - * @param errHnd Optionan error handler. If not {@code null}, an error of each item execution will be passed to - * this handler. If error handler is not {@code null}, the exception will not be thrown from this method. + * @param operation Logic for execution of on each item of data. * @param <T> Type of data. - * @return List of (item, execution future) tuples. - * @throws IgniteCheckedException If parallel execution failed and {@code errHnd} is {@code null}. + * @throws IgniteCheckedException if parallel execution was failed. */ - public static <T> List<T2<T, Future<Object>>> doInParallel( + public static <T> void doInParallel(ExecutorService executorSvc, Collection<T> srcDatas, + IgniteThrowableConsumer<T> operation) throws IgniteCheckedException, IgniteInterruptedCheckedException { + doInParallel(srcDatas.size(), executorSvc, srcDatas, operation); + } + + /** + * Execute operation on data in parallel. + * + * @param parallelismLvl Number of threads on which it should be executed. + * @param executorSvc Service for parallel execution. + * @param srcDatas List of data for parallelization. + * @param operation Logic for execution of on each item of data. + * @param <T> Type of data. + * @throws IgniteCheckedException if parallel execution was failed. + */ + public static <T> void doInParallel( + int parallelismLvl, ExecutorService executorSvc, Collection<T> srcDatas, - IgniteInClosureX<T> consumer, - @Nullable IgniteBiInClosure<T, Throwable> errHnd - ) throws IgniteCheckedException { - List<T2<T, Future<Object>>> consumerFutures = srcDatas.stream() - .map(item -> new T2<>( - item, - executorSvc.submit(() -> { - consumer.apply(item); + IgniteThrowableConsumer<T> operation + ) throws IgniteCheckedException, IgniteInterruptedCheckedException { + List<List<T>> batches = IntStream.range(0, parallelismLvl) + .mapToObj(i -> new ArrayList<T>()) + .collect(Collectors.toList()); - return null; - }))) + int i = 0; + + for (T src : srcDatas) + batches.get(i++ % parallelismLvl).add(src); + + List<Future<Object>> consumerFutures = batches.stream() + .filter(batch -> !batch.isEmpty()) + .map(batch -> executorSvc.submit(() -> { + for (T item : batch) + operation.accept(item); + + return null; + })) .collect(Collectors.toList()); - IgniteCheckedException composite = null; + Throwable error =null; - for (T2<T, Future<Object>> tup : consumerFutures) { + for (Future<Object> future : consumerFutures) { try { - getUninterruptibly(tup.get2()); + future.get(); } - catch (ExecutionException e) { - if (errHnd != null) - errHnd.apply(tup.get1(), e.getCause()); - else { - if (composite == null) - composite = new IgniteCheckedException("Failed to execute one of the tasks " + - "(see suppressed exception for details)"); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - composite.addSuppressed(e.getCause()); - } + throw new IgniteInterruptedCheckedException(e); + } + catch (ExecutionException e) { + if(error == null) + error = e.getCause(); + else + error.addSuppressed(e.getCause()); + } + catch (CancellationException e) { + if(error == null) + error = e; + else + error.addSuppressed(e); } } - if (composite != null) - throw composite; + if (error != null) { + if (error instanceof IgniteCheckedException) + throw (IgniteCheckedException)error; - return consumerFutures; + if (error instanceof RuntimeException) + throw (RuntimeException)error; + + if (error instanceof Error) + throw (Error)error; + + throw new IgniteCheckedException(error); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java b/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java new file mode 100644 index 0000000..7c501c4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; + +/** + * Class for avoid multiple initialization of specific value from various threads. + */ +public class InitializationProtector { + /** Default striped lock concurrency level. */ + private static final int DEFAULT_CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors(); + + /** Striped lock. */ + private GridStripedLock stripedLock = new GridStripedLock(DEFAULT_CONCURRENCY_LEVEL); + + /** + * @param protectedKey Unique value by which initialization code should be run only one time. + * @param initializedVal Supplier for given already initialized value if it exist or null as sign that + * initialization required. + * @param initializationCode Code for initialization value corresponding protectedKey. Should be idempotent. + * @param <T> Type of initialization value. + * @return Initialized value. + * @throws IgniteCheckedException if initialization was failed. + */ + public <T> T protect(Object protectedKey, Supplier<T> initializedVal, + IgniteThrowableRunner initializationCode) throws IgniteCheckedException { + T value = initializedVal.get(); + + if (value != null) + return value; + + Lock lock = stripedLock.getLock(protectedKey.hashCode() % stripedLock.concurrencyLevel()); + + lock.lock(); + try { + value = initializedVal.get(); + + if (value != null) + return value; + + initializationCode.run(); + + return initializedVal.get(); + } + finally { + lock.unlock(); + } + } + + /** + * It method allows to avoid simultaneous initialization from various threads. + * + * @param protectedKey Unique value by which initialization code should be run only from one thread in one time. + * @param initializationCode Code for initialization value corresponding protectedKey. Should be idempotent. + * @throws IgniteCheckedException if initialization was failed. + */ + public void protect(Object protectedKey, IgniteThrowableRunner initializationCode) throws IgniteCheckedException { + protect(protectedKey, () -> null, initializationCode); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java new file mode 100644 index 0000000..46813a9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.lang; + +import java.io.Serializable; +import org.apache.ignite.IgniteCheckedException; + +/** + * Represents an operation that accepts a single input argument and returns no result. Unlike most other functional + * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects. + * + * @param <E> Type of closure parameter. + */ +public interface IgniteThrowableConsumer<E> extends Serializable { + /** + * Consumer body. + * + * @param e Consumer parameter. + * @throws IgniteCheckedException if body execution was failed. + */ + public void accept(E e) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java new file mode 100644 index 0000000..a5c95e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.lang; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Represents a throwable runner. + */ +public interface IgniteThrowableRunner { + /** + * Execute a body. + */ + void run() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java new file mode 100644 index 0000000..4e30d1c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheStartInParallelTest extends GridCommonAbstractTest { + /** */ + private static final int CACHES_COUNT = 40; + + /** */ + private static final String STATIC_CACHE_PREFIX = "static-cache-"; + + /** */ + private static final String DYNAMIC_CACHE_PREFIX = "dynamic-cache-"; + + /** */ + private static boolean isStaticCache = true; + + /** */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSystemThreadPoolSize(10); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + long sz = 100 * 1024 * 1024; + + DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(sz).setMaxSize(sz)) + .setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000); + + cfg.setDataStorageConfiguration(memCfg); + + if (isStaticCache) { + ArrayList<Object> staticCaches = new ArrayList<>(CACHES_COUNT); + + for (int i = 0; i < CACHES_COUNT; i++) + staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX + i)); + + cfg.setCacheConfiguration(staticCaches.toArray(new CacheConfiguration[CACHES_COUNT])); + } + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setName(cacheName); + cfg.setBackups(1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanupTestData(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + cleanupTestData(); + } + + /** */ + private void cleanupTestData() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + System.clearProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL); + + isStaticCache = true; + } + + /** + * Checking that start static caches in parallel faster than consistenly. + * + * @throws Exception if fail. + */ + public void testParallelizationAcceleratesStartOfStaticCaches() throws Exception { + //start caches consistently. + System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "false"); + + long startTime = System.currentTimeMillis(); + + IgniteEx igniteEx = startGrid(0); + + igniteEx.cluster().active(true); + + long totalStartTimeConsistently = System.currentTimeMillis() - startTime; + + //check cache started. + for (int i = 0; i < CACHES_COUNT; i++) + igniteEx.cache(STATIC_CACHE_PREFIX + i).put(i, i); + + stopAllGrids(); + + //start caches in parallel. + System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "true"); + + startTime = System.currentTimeMillis(); + + igniteEx = startGrid(0); + + igniteEx.cluster().active(true); + + long totalStartTimeInParallel = System.currentTimeMillis() - startTime; + + for (int i = 0; i < CACHES_COUNT; i++) + igniteEx.cache(STATIC_CACHE_PREFIX + i).put(i, i); + + stopAllGrids(); + + assertTrue("Consistently cache stat time : " + totalStartTimeConsistently + + "Parallelization cache stat time : " + totalStartTimeInParallel, + totalStartTimeConsistently > totalStartTimeInParallel); + } + + /** + * Checking that start dynamic caches in parallel faster than consistenly. + * + * @throws Exception if fail. + */ + public void testParallelizationAcceleratesStartOfCaches2() throws Exception { + //prepare dynamic caches. + isStaticCache = false; + + IgniteEx igniteEx = startGrid(0); + + igniteEx.cluster().active(true); + + for (int i = 0; i < CACHES_COUNT; i++) + igniteEx.getOrCreateCache(DYNAMIC_CACHE_PREFIX + i); + + stopAllGrids(); + + //start caches consistently. + System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "false"); + + igniteEx = startGrid(0); + long startTime = System.currentTimeMillis(); + + igniteEx.cluster().active(true); + + long totalStartTimeConsistently = System.currentTimeMillis() - startTime; + + for (int i = 0; i < CACHES_COUNT; i++) + igniteEx.cache(DYNAMIC_CACHE_PREFIX + i); + + stopAllGrids(); + + //start caches in parallel. + System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "true"); + + startTime = System.currentTimeMillis(); + + igniteEx = startGrid(0); + + igniteEx.cluster().active(true); + + long totalStartTimeInParallel = System.currentTimeMillis() - startTime; + + for (int i = 0; i < CACHES_COUNT; i++) + igniteEx.cache(DYNAMIC_CACHE_PREFIX + i).put(i, i); + + stopAllGrids(); + + assertTrue("Consistently cache stat time : " + totalStartTimeConsistently + + "Parallelization cache stat time : " + totalStartTimeInParallel, + totalStartTimeConsistently > totalStartTimeInParallel); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index 870ce67..66453b8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.processors.cache.distributed; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; import java.util.Collection; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import javax.cache.Cache; -import javax.cache.configuration.Factory; -import javax.cache.integration.CacheLoaderException; -import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.store.CacheStore; @@ -61,7 +61,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { CacheConfiguration cfg3 = cacheConfiguration("cacheC", new SecondStoreFactory()); CacheConfiguration cfg4 = cacheConfiguration("cacheD", null); - cfg.setCacheConfiguration(cfg1, cfg2, cfg3, cfg4); + cfg.setCacheConfiguration(cfg4, cfg2, cfg3, cfg1); return cfg; } @@ -92,6 +92,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); + cleanPersistenceDir(); + startGrids(4); } @@ -160,8 +162,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { "delete cacheA", "write cacheB", "sessionEnd true" - ), - firstStoreEvts); + ), + firstStoreEvts); assertEquals(0, secondStoreEvts.size()); } @@ -209,16 +211,16 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { "write cacheA", "delete cacheA", "sessionEnd true" - ), - firstStoreEvts); + ), + firstStoreEvts); assertEqualsCollections(F.asList( "writeAll cacheC 2", "deleteAll cacheC 2", "write cacheC", "sessionEnd true" - ), - secondStoreEvts); + ), + secondStoreEvts); } /** @@ -264,8 +266,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { "write cacheA", "delete cacheA", "sessionEnd true" - ), - firstStoreEvts); + ), + firstStoreEvts); assertEquals(0, secondStoreEvts.size()); } @@ -361,15 +363,10 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { private Ignite ignite; /** {@inheritDoc} */ - @Override public CacheStore create() { + @Override public synchronized CacheStore create() { String igniteInstanceName = ignite.name(); - CacheStore store = firstStores.get(igniteInstanceName); - - if (store == null) - store = F.addIfAbsent(firstStores, igniteInstanceName, new TestStore()); - - return store; + return firstStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore()); } } @@ -384,12 +381,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { @Override public CacheStore create() { String igniteInstanceName = ignite.name(); - CacheStore store = secondStores.get(igniteInstanceName); - - if (store == null) - store = F.addIfAbsent(secondStores, igniteInstanceName, new TestStore()); - - return store; + return secondStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore()); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java index 61a076e..13a1044 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java @@ -44,6 +44,10 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterGroup; @@ -877,6 +881,76 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { } /** + * + */ + public void testDoInParallel() throws Throwable { + CyclicBarrier barrier = new CyclicBarrier(3); + + IgniteUtils.doInParallel(3, + Executors.newFixedThreadPool(3), + Arrays.asList(1, 2, 3), + i -> { + try { + barrier.await(1, TimeUnit.SECONDS); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + ); + } + + /** + * + */ + public void testDoInParallelBatch() { + CyclicBarrier barrier = new CyclicBarrier(3); + + try { + IgniteUtils.doInParallel(2, + Executors.newFixedThreadPool(3), + Arrays.asList(1, 2, 3), + i -> { + try { + barrier.await(400, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + ); + + fail("Should throw timeout exception"); + } + catch (Exception e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } + + /** + * + */ + public void testDoInParallelException() { + String expectedException = "ExpectedException"; + + try { + IgniteUtils.doInParallel(3, + Executors.newFixedThreadPool(1), + Arrays.asList(1, 2, 3), + i -> { + if (i == 1) + throw new IgniteCheckedException(expectedException); + } + ); + + fail("Should throw ParallelExecutionException"); + } + catch (IgniteCheckedException e) { + assertEquals(expectedException, e.getMessage()); + } + } + + /** * Test enum. */ private enum TestEnum { http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index d0734a8..0381a1f 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartition import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest; import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest; import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheStartInParallelTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest; @@ -99,6 +100,8 @@ public class IgniteCacheTestSuite7 extends TestSuite { suite.addTestSuite(CacheRentingStateRepairTest.class); + suite.addTestSuite(CacheStartInParallelTest.class); + suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class); suite.addTestSuite(CacheDataLossOnPartitionMoveTest.class);
