http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index dc22669..8282b0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -34,6 +34,12 @@ import org.jetbrains.annotations.Nullable; */ public class ExchangeActions { /** */ + private List<CacheGroupDescriptor> cacheGrpsToStart; + + /** */ + private List<CacheGroupDescriptor> cacheGrpsToStop; + + /** */ private Map<String, ActionData> cachesToStart; /** */ @@ -57,6 +63,8 @@ public class ExchangeActions { boolean clientOnlyExchange() { return F.isEmpty(cachesToStart) && F.isEmpty(cachesToStop) && + F.isEmpty(cacheGrpsToStart) && + F.isEmpty(cacheGrpsToStop) && F.isEmpty(cachesToResetLostParts); } @@ -64,8 +72,8 @@ public class ExchangeActions { * @param nodeId Local node ID. * @return Close cache requests. */ - List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) { - List<DynamicCacheChangeRequest> res = null; + List<ActionData> closeRequests(UUID nodeId) { + List<ActionData> res = null; if (cachesToClose != null) { for (ActionData req : cachesToClose.values()) { @@ -73,12 +81,12 @@ public class ExchangeActions { if (res == null) res = new ArrayList<>(cachesToClose.size()); - res.add(req.req); + res.add(req); } } } - return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList(); + return res != null ? res : Collections.<ActionData>emptyList(); } /** @@ -289,6 +297,74 @@ public class ExchangeActions { } /** + * @param grpDesc Group descriptor. + */ + void addCacheGroupToStart(CacheGroupDescriptor grpDesc) { + assert grpDesc != null; + + if (cacheGrpsToStart == null) + cacheGrpsToStart = new ArrayList<>(); + + cacheGrpsToStart.add(grpDesc); + } + + /** + * @return Cache groups to start. + */ + public List<CacheGroupDescriptor> cacheGroupsToStart() { + return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupDescriptor>emptyList(); + } + + /** + * @param grpId Group ID. + * @return {@code True} if given cache group starting. + */ + public boolean cacheGroupStarting(int grpId) { + if (cacheGrpsToStart != null) { + for (CacheGroupDescriptor grp : cacheGrpsToStart) { + if (grp.groupId() == grpId) + return true; + } + } + + return false; + } + + /** + * @param grpDesc Group descriptor. + */ + public void addCacheGroupToStop(CacheGroupDescriptor grpDesc) { + assert grpDesc != null; + + if (cacheGrpsToStop == null) + cacheGrpsToStop = new ArrayList<>(); + + cacheGrpsToStop.add(grpDesc); + } + + /** + * @return Cache groups to start. + */ + public List<CacheGroupDescriptor> cacheGroupsToStop() { + return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupDescriptor>emptyList(); + } + + /** + * @param grpId Group ID. + * @return {@code True} if given cache group stopping. + */ + public boolean cacheGroupStopping(int grpId) { + if (cacheGrpsToStop != null) { + for (CacheGroupDescriptor grp : cacheGrpsToStop) { + if (grp.groupId() == grpId) + return true; + } + } + + return false; + } + + /** * @return {@code True} if there are no cache change actions. */ public boolean empty() { @@ -296,6 +372,8 @@ public class ExchangeActions { F.isEmpty(clientCachesToStart) && F.isEmpty(cachesToStop) && F.isEmpty(cachesToClose) && + F.isEmpty(cacheGrpsToStart) && + F.isEmpty(cacheGrpsToStop) && F.isEmpty(cachesToResetLostParts); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a84d093..2ee6c33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -303,15 +303,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @SuppressWarnings("OverriddenMethodCallDuringObjectConstruction") protected GridCacheAdapter(GridCacheContext<K, V> ctx) { - this(ctx, DFLT_START_CACHE_SIZE); - } - - /** - * @param ctx Cache context. - * @param startSize Start size. - */ - @SuppressWarnings("OverriddenMethodCallDuringObjectConstruction") - protected GridCacheAdapter(GridCacheContext<K, V> ctx, int startSize) { this(ctx, null); } @@ -390,7 +381,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param e Map entry. */ public void incrementSize(GridCacheMapEntry e) { - map.incrementPublicSize(e); + map.incrementPublicSize(null, e); } /** @@ -398,7 +389,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param e Map entry. */ public void decrementSize(GridCacheMapEntry e) { - map.decrementPublicSize(e); + map.decrementPublicSize(null, e); } /** @@ -550,21 +541,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @return Entry factory. - */ - protected abstract GridCacheMapEntryFactory entryFactory(); - - /** * Starts this cache. Child classes should override this method * to provide custom start-up behavior. * * @throws IgniteCheckedException If start failed. */ - public void start() throws IgniteCheckedException { - if (map == null) { - map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), DFLT_START_CACHE_SIZE); - } - } + public abstract void start() throws IgniteCheckedException; /** * Startup info. @@ -718,7 +700,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteCacheOffheapManager offheapMgr = ctx.isNear() ? ctx.near().dht().context().offheap() : ctx.offheap(); - its.add(offheapMgr.<K, V>entriesIterator(modes.primary, modes.backup, topVer, ctx.keepBinary())); + its.add(offheapMgr.<K, V>cacheEntriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary())); } } else if (modes.heap) { @@ -945,7 +927,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Entry (never {@code null}). */ public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) { - GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, true, false); + GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(ctx, topVer, key, true, false); assert e != null; @@ -961,10 +943,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @Nullable private GridCacheEntryEx entry0(KeyCacheObject key, AffinityTopologyVersion topVer, boolean create, boolean touch) { - GridCacheMapEntry cur = map.getEntry(key); + GridCacheMapEntry cur = map.getEntry(ctx, key); if (cur == null || cur.obsolete()) { cur = map.putEntryIfObsoleteOrAbsent( + ctx, topVer, key, create, touch); @@ -984,7 +967,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Set of internal cached entry representations. */ public final Iterable<? extends GridCacheEntryEx> allEntries() { - return map.entries(); + return map.entries(ctx.cacheId()); } /** {@inheritDoc} */ @@ -994,7 +977,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public final Set<K> keySet() { - return new KeySet(map.entrySet()); + return new KeySet(map.entrySet(ctx.cacheId())); } /** @@ -1004,7 +987,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public final void removeIfObsolete(KeyCacheObject key) { assert key != null; - GridCacheMapEntry entry = map.getEntry(key); + GridCacheMapEntry entry = map.getEntry(ctx, key); if (entry != null && entry.obsolete()) removeEntry(entry); @@ -1913,7 +1896,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V boolean skipEntry = readNoEntry; if (readNoEntry) { - CacheDataRow row = ctx.offheap().read(key); + CacheDataRow row = ctx.offheap().read(ctx, key); if (row != null) { long expireTime = row.expireTime(); @@ -2908,7 +2891,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size())); do { - for (Iterator<CacheDataRow> it = ctx.offheap().iterator(true, true, null); + for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null); it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; ) keys.add((K)it.next().key()); @@ -3711,7 +3694,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { return ctx.kernalContext().task().execute( new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null); - } catch (ClusterGroupEmptyException e) { + } + catch (ClusterGroupEmptyException e) { return new GridFinishedFuture<>(0); } } @@ -3773,12 +3757,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public int size() { - return map.publicSize(); + return map.publicSize(ctx.cacheId()); } /** {@inheritDoc} */ @Override public long sizeLong() { - return map.publicSize(); + return map.publicSize(ctx.cacheId()); } /** {@inheritDoc} */ @@ -3788,12 +3772,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public int primarySize() { - return map.publicSize(); + return map.publicSize(ctx.cacheId()); } /** {@inheritDoc} */ @Override public long primarySizeLong() { - return map.publicSize(); + return map.publicSize(ctx.cacheId()); } /** {@inheritDoc} */ @@ -3910,7 +3894,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { IgniteCacheOffheapManager mgr = ctx.offheap(); - return mgr != null ? mgr.entriesCount(false, true, ctx.affinity().affinityTopologyVersion()) : -1; + return mgr != null ? mgr.cacheEntriesCount(ctx.cacheId(), + false, + true, + ctx.affinity().affinityTopologyVersion()) : -1; } catch (IgniteCheckedException ignore) { return 0; @@ -4432,7 +4419,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public Set<Cache.Entry<K, V>> entrySet(@Nullable CacheEntryPredicate... filter) { boolean keepBinary = ctx.keepBinary(); - return new EntrySet(map.entrySet(filter), keepBinary); + return new EntrySet(map.entrySet(ctx.cacheId(), filter), keepBinary); } /** @@ -6504,7 +6491,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public boolean contains(Object o) { - GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o)); + GridCacheMapEntry entry = map.getEntry(ctx, ctx.toCacheKeyObject(o)); return entry != null && internalSet.contains(entry); } @@ -6594,7 +6581,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public boolean contains(Object o) { - GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o)); + GridCacheMapEntry entry = map.getEntry(ctx, ctx.toCacheKeyObject(o)); return entry != null && internalSet.contains(entry); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 2bb6f6c..702b848 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -28,14 +28,12 @@ import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -63,12 +61,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { affFunction = cctx.config().getAffinity(); affMapper = cctx.config().getAffinityMapper(); - aff = new GridAffinityAssignmentCache(cctx.kernalContext(), - cctx.name(), - affFunction, - cctx.config().getNodeFilter(), - cctx.config().getBackups(), - cctx.isLocal()); + aff = cctx.group().affinity(); } /** {@inheritDoc} */ @@ -79,42 +72,6 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - cancelFutures(); - } - - /** - * - */ - public void cancelFutures() { - if (!starting.get()) - // Ignoring attempt to stop manager that has never been started. - return; - - IgniteCheckedException err = - new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); - - if (aff != null) - aff.cancelFutures(err); - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(IgniteFuture reconnectFut) { - IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, - "Failed to wait for topology update, client disconnected."); - - if (aff != null) - aff.cancelFutures(err); - } - - /** - * - */ - public void onReconnected() { - aff.onReconnected(); - } - - /** {@inheritDoc} */ @Override protected void stop0(boolean cancel, boolean destroy) { aff = null; } @@ -443,23 +400,6 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** - * Dumps debug information. - */ - public void dumpDebugInfo() { - GridAffinityAssignmentCache aff0 = aff; - - if (aff0 != null) - aff0.dumpDebugInfo(); - } - - /** - * @return Affinity cache. - */ - public GridAffinityAssignmentCache affinityCache() { - return aff; - } - - /** * @param part Partition. * @param startVer Start version. * @param endVer End version. http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index 4af2518..7d97159 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -52,6 +52,7 @@ public class GridCacheAttributes implements Serializable { /** * @param cfg Cache configuration. + * @param sql SQL flag. */ public GridCacheAttributes(CacheConfiguration cfg, boolean sql) { ccfg = cfg; @@ -60,10 +61,10 @@ public class GridCacheAttributes implements Serializable { } /** - * Public no-arg constructor for {@link Externalizable}. + * @return Cache group name. */ - public GridCacheAttributes() { - // No-op. + public String groupName() { + return ccfg.getGroupName(); } /** @@ -278,6 +279,20 @@ public class GridCacheAttributes implements Serializable { } /** + * @return Node filter class name. + */ + String nodeFilterClassName() { + return className(ccfg.getNodeFilter()); + } + + /** + * @return Topology validator class name. + */ + String topologyValidatorClassName() { + return className(ccfg.getTopologyValidator()); + } + + /** * @return SQL flag. */ public boolean sql() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index df19225..d37cecb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@ -82,7 +82,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable { if (!ctx.isNear()) { if (id == 0) - ctx.offheap().clear(readers); + ctx.offheap().clearCache(ctx, readers); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index 9f20d64..282faaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.processors.cache; import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** @@ -27,15 +30,17 @@ import org.jetbrains.annotations.Nullable; public interface GridCacheConcurrentMap { /** * Returns the entry associated with the specified key in the - * HashMap. Returns null if the HashMap contains no mapping + * HashMap. Returns null if the HashMap contains no mapping * for this key. * + * @param ctx Cache context. * @param key Key. * @return Entry. */ - @Nullable public GridCacheMapEntry getEntry(KeyCacheObject key); + @Nullable public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key); /** + * @param ctx Cache context. * @param topVer Topology version. * @param key Key. * @param create Create flag. @@ -44,6 +49,7 @@ public interface GridCacheConcurrentMap { * couldn't be created. */ @Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent( + GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key, boolean create, @@ -70,39 +76,66 @@ public interface GridCacheConcurrentMap { * It excludes entries that are marked as deleted. * It also does not include entries from underlying data store. * + * @param cacheId Cache ID. * @return the number of publicly available key-value mappings in this map. */ - public int publicSize(); + public int publicSize(int cacheId); /** * Increments public size. * * @param e Entry that caused public size change. + * @param hld Cache map (passed as optimization to avoid cache map lookup for shared groups). */ - public void incrementPublicSize(GridCacheEntryEx e); + public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e); /** * Decrements public size. * * @param e Entry that caused public size change. + * @param hld Cache map (passed as optimization to avoid cache map lookup for shared groups). */ - public void decrementPublicSize(GridCacheEntryEx e); + public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e); /** + * @param cacheId Cache ID. * @param filter Filter. * @return Iterable of the mappings contained in this map, excluding entries in unvisitable state. */ - public Iterable<GridCacheMapEntry> entries(CacheEntryPredicate... filter); + public Iterable<GridCacheMapEntry> entries(int cacheId, CacheEntryPredicate... filter); /** + * @param cacheId Cache ID. * @param filter Filter. - * @return Iterable of the mappings contained in this map, including entries in unvisitable state. + * @return Set of the mappings contained in this map. */ - public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter); + public Set<GridCacheMapEntry> entrySet(int cacheId, CacheEntryPredicate... filter); /** - * @param filter Filter. - * @return Set of the mappings contained in this map. + * */ - public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter); + static class CacheMapHolder { + /** */ + public final GridCacheContext cctx; + + /** */ + public final AtomicInteger size = new AtomicInteger(); + + /** */ + public final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map; + + /** + * @param cctx Cache context. + * @param map Map. + */ + public CacheMapHolder(GridCacheContext cctx, ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) { + this.cctx = cctx; + this.map = map; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheMapHolder.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java index b02a2b7..37f2a51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java @@ -20,15 +20,14 @@ package org.apache.ignite.internal.processors.cache; import java.util.AbstractSet; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_CREATED; import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_DESTROYED; @@ -37,78 +36,49 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_DESTROYED; * Implementation of concurrent cache map. */ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap { - /** Default load factor. */ - private static final float DFLT_LOAD_FACTOR = 0.75f; - - /** Default concurrency level. */ - private static final int DFLT_CONCUR_LEVEL = Runtime.getRuntime().availableProcessors() * 2; - - /** Internal map. */ - private final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map; - /** Map entry factory. */ private final GridCacheMapEntryFactory factory; - /** Cache context. */ - private final GridCacheContext ctx; - /** * Creates a new, empty map with the specified initial * capacity. * - * @param ctx Cache context. * @param factory Entry factory. - * @param initialCapacity the initial capacity. The implementation - * performs internal sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity is * negative. */ - public GridCacheConcurrentMapImpl(GridCacheContext ctx, GridCacheMapEntryFactory factory, int initialCapacity) { - this(ctx, factory, initialCapacity, DFLT_LOAD_FACTOR, DFLT_CONCUR_LEVEL); + public GridCacheConcurrentMapImpl(GridCacheMapEntryFactory factory) { + this.factory = factory; } - /** - * Creates a new, empty map with the specified initial - * capacity, load factor and concurrency level. - * - * @param ctx Cache context. - * @param factory Entry factory. - * @param initialCapacity the initial capacity. The implementation - * performs internal sizing to accommodate this many elements. - * @param loadFactor the load factor threshold, used to control resizing. - * Resizing may be performed when the average number of elements per - * bin exceeds this threshold. - * @param concurrencyLevel the estimated number of concurrently - * updating threads. The implementation performs internal sizing - * to try to accommodate this many threads. - * @throws IllegalArgumentException if the initial capacity is - * negative or the load factor or concurrencyLevel are - * non-positive. - */ - public GridCacheConcurrentMapImpl( - GridCacheContext ctx, - GridCacheMapEntryFactory factory, - int initialCapacity, - float loadFactor, - int concurrencyLevel - ) { - this.ctx = ctx; - this.factory = factory; + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) { + CacheMapHolder hld = entriesMapIfExists(ctx.cacheIdBoxed()); - map = new ConcurrentHashMap8<>(initialCapacity, loadFactor, concurrencyLevel); + return hld != null ? hld.map.get(key) : null; } /** {@inheritDoc} */ - @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) { - return map.get(key); + @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent( + GridCacheContext ctx, + final AffinityTopologyVersion topVer, + KeyCacheObject key, + final boolean create, + final boolean touch) { + return putEntryIfObsoleteOrAbsent(null, ctx, topVer, key, create, touch); } - /** {@inheritDoc} */ - @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(final AffinityTopologyVersion topVer, + protected final GridCacheMapEntry putEntryIfObsoleteOrAbsent( + @Nullable CacheMapHolder hld, + GridCacheContext ctx, + final AffinityTopologyVersion topVer, KeyCacheObject key, final boolean create, final boolean touch) { + if (hld == null) + hld = entriesMapIfExists(ctx.cacheIdBoxed()); + GridCacheMapEntry cur = null; GridCacheMapEntry created = null; GridCacheMapEntry created0 = null; @@ -120,7 +90,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM try { while (!done) { - GridCacheMapEntry entry = map.get(key); + GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; created = null; doomed = null; @@ -134,12 +104,18 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM reserved = true; } + if (hld == null) { + hld = entriesMap(ctx); + + assert hld != null; + } + created0 = factory.create(ctx, topVer, key); } cur = created = created0; - done = map.putIfAbsent(created.key(), created) == null; + done = hld.map.putIfAbsent(created.key(), created) == null; } else done = true; @@ -162,10 +138,10 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM cur = created = created0; - done = map.replace(entry.key(), doomed, created); + done = hld.map.replace(entry.key(), doomed, created); } else - done = map.remove(entry.key(), doomed); + done = hld.map.remove(entry.key(), doomed); } else { cur = entry; @@ -231,18 +207,31 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM } finally { if (reserved) - release(sizeChange, cur); + release(sizeChange, hld, cur); else { if (sizeChange != 0) { assert sizeChange == -1; + assert doomed != null; - decrementPublicSize(cur); + decrementPublicSize(hld, doomed); } } } } /** + * @param cctx Cache context. + * @return Map for given cache ID. + */ + @Nullable protected abstract CacheMapHolder entriesMap(GridCacheContext cctx); + + /** + * @param cacheId Cache ID. + * @return Map for given cache ID. + */ + @Nullable protected abstract CacheMapHolder entriesMapIfExists(Integer cacheId); + + /** * */ protected boolean reserve() { @@ -258,63 +247,75 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** * @param sizeChange Size delta. + * @param hld Map holder. * @param e Map entry. */ - protected void release(int sizeChange, GridCacheEntryEx e) { + protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { if (sizeChange == 1) - incrementPublicSize(e); + incrementPublicSize(hld, e); else if (sizeChange == -1) - decrementPublicSize(e); + decrementPublicSize(hld, e); } /** {@inheritDoc} */ @Override public boolean removeEntry(final GridCacheEntryEx entry) { - boolean removed = map.remove(entry.key(), entry); + GridCacheContext ctx = entry.context(); + + CacheMapHolder hld = entriesMapIfExists(ctx.cacheIdBoxed()); + + boolean rmv = hld != null && hld.map.remove(entry.key(), entry); - if (removed) { - if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) + if (rmv) { + if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) { // Event notification. - ctx.events().addEvent(entry.partition(), entry.key(), ctx.localNodeId(), (IgniteUuid)null, null, - EVT_CACHE_ENTRY_DESTROYED, null, false, null, false, null, null, null, false); + ctx.events().addEvent(entry.partition(), + entry.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_ENTRY_DESTROYED, + null, + false, + null, + false, + null, + null, + null, + false); + } synchronized (entry) { if (!entry.deleted()) - decrementPublicSize(entry); + decrementPublicSize(hld, entry); } } - return removed; + return rmv; } /** {@inheritDoc} */ - @Override public int internalSize() { - return map.size(); - } + @Override public Collection<GridCacheMapEntry> entries(int cacheId, final CacheEntryPredicate... filter) { + CacheMapHolder hld = entriesMapIfExists(cacheId); + + if (hld == null) + return Collections.emptyList(); - /** {@inheritDoc} */ - @Override public Collection<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) { final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() { @Override public boolean apply(GridCacheMapEntry entry) { return entry.visitable(filter); } }; - return F.viewReadOnly(map.values(), F.<GridCacheMapEntry>identity(), p); + return F.viewReadOnly(hld.map.values(), F.<GridCacheMapEntry>identity(), p); } /** {@inheritDoc} */ - @Override public Collection<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) { - final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() { - @Override public boolean apply(GridCacheMapEntry entry) { - return F.isAll(entry, filter); - } - }; + @Override public Set<GridCacheMapEntry> entrySet(int cacheId, final CacheEntryPredicate... filter) { + final CacheMapHolder hld = entriesMapIfExists(cacheId); - return F.viewReadOnly(map.values(), F.<GridCacheMapEntry>identity(), p); - } + if (hld == null) + return Collections.emptySet(); - /** {@inheritDoc} */ - @Override public Set<GridCacheMapEntry> entrySet(final CacheEntryPredicate... filter) { final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() { @Override public boolean apply(GridCacheMapEntry entry) { return entry.visitable(filter); @@ -323,7 +324,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM return new AbstractSet<GridCacheMapEntry>() { @Override public Iterator<GridCacheMapEntry> iterator() { - return F.iterator0(map.values(), true, p); + return F.iterator0(hld.map.values(), true, p); } @Override public int size() { @@ -336,7 +337,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM GridCacheMapEntry entry = (GridCacheMapEntry)o; - return entry.equals(map.get(entry.key())) && p.apply(entry); + return entry.equals(hld.map.get(entry.key())) && p.apply(entry); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 190e952..44cf4e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -45,6 +45,7 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; @@ -56,8 +57,6 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; -import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; -import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -84,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.lang.GridFunc; @@ -106,9 +104,10 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** @@ -135,14 +134,8 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache shared context. */ private GridCacheSharedContext<K, V> sharedCtx; - /** Memory policy. */ - private MemoryPolicy memPlc; - - /** FreeList instance this cache is associated with. */ - private FreeList freeList; - - /** ReuseList instance this cache is associated with */ - private ReuseList reuseList; + /** Cache group. */ + private CacheGroupContext grp; /** Logger. */ private IgniteLogger log; @@ -177,9 +170,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** Replication manager. */ private GridCacheDrManager drMgr; - /** */ - private IgniteCacheOffheapManager offheapMgr; - /** Conflict resolver manager. */ private CacheConflictResolutionManager rslvrMgr; @@ -210,6 +200,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache ID. */ private int cacheId; + /** Cache ID. */ + private Integer cacheIdBoxed; + /** Cache type. */ private CacheType cacheType; @@ -237,18 +230,12 @@ public class GridCacheContext<K, V> implements Externalizable { /** Topology version when cache was started on local node. */ private AffinityTopologyVersion locStartTopVer; - /** */ - private UUID rcvdFrom; - /** Dynamic cache deployment ID. */ private IgniteUuid dynamicDeploymentId; /** Updates allowed flag. */ private boolean updatesAllowed; - /** Flag indicating that this cache is in a recovery mode. */ - private boolean needsRecovery; - /** Deployment enabled flag for this specific cache */ private boolean depEnabled; @@ -258,6 +245,12 @@ public class GridCacheContext<K, V> implements Externalizable { /** */ private boolean customAffMapper; + /** Whether {@link EventType#EVT_CACHE_REBALANCE_STARTED} was sent (used only for REPLICATED cache). */ + private volatile boolean rebalanceStartedEvtSent; + + /** Whether {@link EventType#EVT_CACHE_REBALANCE_STOPPED} was sent (used only for REPLICATED cache). */ + private volatile boolean rebalanceStoppedEvtSent; + /** * Empty constructor required for {@link Externalizable}. */ @@ -269,9 +262,9 @@ public class GridCacheContext<K, V> implements Externalizable { * @param ctx Kernal context. * @param sharedCtx Cache shared context. * @param cacheCfg Cache configuration. + * @param grp Cache group. * @param cacheType Cache type. - * @param memPlc MemoryPolicy instance. - * @param freeList FreeList instance. + * @param locStartTopVer Topology version when cache was started on local node. * @param affNode {@code True} if local node is affinity node. * @param updatesAllowed Updates allowed flag. * @param evtMgr Cache event manager. @@ -291,14 +284,11 @@ public class GridCacheContext<K, V> implements Externalizable { GridKernalContext ctx, GridCacheSharedContext sharedCtx, CacheConfiguration cacheCfg, + CacheGroupContext grp, CacheType cacheType, AffinityTopologyVersion locStartTopVer, - UUID rcvdFrom, boolean affNode, boolean updatesAllowed, - MemoryPolicy memPlc, - FreeList freeList, - ReuseList reuseList, /* * Managers in starting order! @@ -313,7 +303,6 @@ public class GridCacheContext<K, V> implements Externalizable { CacheDataStructuresManager dataStructuresMgr, GridCacheTtlManager ttlMgr, GridCacheDrManager drMgr, - IgniteCacheOffheapManager offheapMgr, CacheConflictResolutionManager<K, V> rslvrMgr, CachePluginManager pluginMgr, GridCacheAffinityManager affMgr @@ -323,6 +312,7 @@ public class GridCacheContext<K, V> implements Externalizable { assert cacheCfg != null; assert locStartTopVer != null : cacheCfg.getName(); + assert grp != null; assert evtMgr != null; assert storeMgr != null; assert evictMgr != null; @@ -333,22 +323,17 @@ public class GridCacheContext<K, V> implements Externalizable { assert ttlMgr != null; assert rslvrMgr != null; assert pluginMgr != null; - assert offheapMgr != null; this.ctx = ctx; this.sharedCtx = sharedCtx; this.cacheCfg = cacheCfg; + this.grp = grp; this.cacheType = cacheType; this.locStartTopVer = locStartTopVer; - this.rcvdFrom = rcvdFrom; this.affNode = affNode; this.updatesAllowed = updatesAllowed; this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg); - this.memPlc = memPlc; - this.freeList = freeList; - this.reuseList = reuseList; - /* * Managers in starting order! * =========================== @@ -361,7 +346,6 @@ public class GridCacheContext<K, V> implements Externalizable { this.dataStructuresMgr = add(dataStructuresMgr); this.ttlMgr = add(ttlMgr); this.drMgr = add(drMgr); - this.offheapMgr = add(offheapMgr); this.rslvrMgr = add(rslvrMgr); this.pluginMgr = add(pluginMgr); this.affMgr = add(affMgr); @@ -374,6 +358,8 @@ public class GridCacheContext<K, V> implements Externalizable { cacheId = CU.cacheId(cacheName); + cacheIdBoxed = cacheId; + plc = cacheType.ioPolicy(); Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory(); @@ -387,6 +373,20 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Cache group ID. + */ + public int groupId() { + return grp.groupId(); + } + + /** + * @return Cache group. + */ + public CacheGroupContext group() { + return grp; + } + + /** * @return {@code True} if custom {@link AffinityKeyMapper} is configured for cache. */ public boolean customAffinityMapper() { @@ -460,13 +460,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return Node ID cache was received from. - */ - public UUID receivedFrom() { - return rcvdFrom; - } - - /** * @return Topology version when cache was started on local node. */ public AffinityTopologyVersion startTopologyVersion() { @@ -525,6 +518,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Cache ID. + */ + public Integer cacheIdBoxed() { + return cacheIdBoxed; + } + + /** * @return {@code True} if should use system transactions which are isolated from user transactions. */ public boolean systemTx() { @@ -721,21 +721,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Memory policy. */ public MemoryPolicy memoryPolicy() { - return memPlc; - } - - /** - * @return Free List. - */ - public FreeList freeList() { - return freeList; - } - - /** - * @return Reuse List. - */ - public ReuseList reuseList() { - return reuseList; + return grp.memoryPolicy(); } /** @@ -775,7 +761,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if rebalance is enabled. */ public boolean rebalanceEnabled() { - return cacheCfg.getRebalanceMode() != NONE; + return grp.rebalanceEnabled(); } /** @@ -854,6 +840,18 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return DHT cache. + */ + public GridDhtCacheAdapter dhtCache() { + GridCacheAdapter<K, V> cache = this.cache; + + if (cache == null) + throw new IllegalStateException("Cache stopped: " + cacheName); + + return isNear() ? ((GridNearCacheAdapter<K, V>)cache).dht() : dht(); + } + + /** * @return Topology version future. */ public GridDhtTopologyFuture topologyVersionFuture() { @@ -1073,7 +1071,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Offheap manager. */ public IgniteCacheOffheapManager offheap() { - return offheapMgr; + return grp.offheap(); } /** @@ -1945,20 +1943,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return Current cache state. Must only be modified during exchange. - */ - public boolean needsRecovery() { - return needsRecovery; - } - - /** - * @param needsRecovery Needs recovery flag. - */ - public void needsRecovery(boolean needsRecovery) { - this.needsRecovery = needsRecovery; - } - - /** * Nulling references to potentially leak-prone objects. */ public void cleanup() { @@ -2027,7 +2011,6 @@ public class GridCacheContext<K, V> implements Externalizable { topology().partitionState(localNodeId(), part) == OWNING : "result = " + result + ", persistenceEnabled = " + ctx.cache().context().database().persistenceEnabled() + ", partitionState = " + topology().partitionState(localNodeId(), part); - ; return result; } @@ -2045,13 +2028,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return {@code True} if fast eviction is allowed. - */ - public boolean allowFastEviction() { - return shared().database().persistenceEnabled() && !QueryUtils.isEnabled(cacheCfg); - } - - /** * @param part Partition. * @param affNodes Affinity nodes. * @param topVer Topology version. @@ -2066,6 +2042,35 @@ public class GridCacheContext<K, V> implements Externalizable { || (top.partitionState(localNodeId(), part) == OWNING); } + /** + * @param type Event type. + * @return {@code True} if event should be recorded. + */ + public boolean recordEvent(int type) { + if (isReplicated()) { + if (type == EVT_CACHE_REBALANCE_STARTED) { + if (!rebalanceStartedEvtSent) { + rebalanceStartedEvtSent = true; + + return true; + } + else + return false; + } + else if (type == EVT_CACHE_REBALANCE_STOPPED) { + if (!rebalanceStoppedEvtSent) { + rebalanceStoppedEvtSent = true; + + return true; + } + else + return false; + } + } + + return true; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, igniteInstanceName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index c0e1c55..7371153 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -317,17 +317,17 @@ public class GridCacheEntryInfo implements Message { } /** + * @param ctx Cache object context. * @return Marshalled size. + * @throws IgniteCheckedException If failed. */ - public int marshalledSize(GridCacheContext ctx) throws IgniteCheckedException { + public int marshalledSize(CacheObjectContext ctx) throws IgniteCheckedException { int size = 0; - CacheObjectContext cacheObjCtx = ctx.cacheObjectContext(); - if (val != null) - size += val.valueBytes(cacheObjCtx).length; + size += val.valueBytes(ctx).length; - size += key.valueBytes(cacheObjCtx).length; + size += key.valueBytes(ctx).length; return SIZE_OVERHEAD + size; } @@ -337,12 +337,20 @@ public class GridCacheEntryInfo implements Message { * @throws IgniteCheckedException In case of error. */ public void marshal(GridCacheContext ctx) throws IgniteCheckedException { + marshal(ctx.cacheObjectContext()); + } + + /** + * @param ctx Cache context. + * @throws IgniteCheckedException In case of error. + */ + public void marshal(CacheObjectContext ctx) throws IgniteCheckedException { assert key != null; - key.prepareMarshal(ctx.cacheObjectContext()); + key.prepareMarshal(ctx); if (val != null) - val.prepareMarshal(ctx.cacheObjectContext()); + val.prepareMarshal(ctx); if (expireTime == 0) expireTime = -1; @@ -362,10 +370,21 @@ public class GridCacheEntryInfo implements Message { * @throws IgniteCheckedException If unmarshalling failed. */ public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { - key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); + unmarshal(ctx.cacheObjectContext(), clsLdr); + } + + /** + * Unmarshalls entry. + * + * @param ctx Cache context. + * @param clsLdr Class loader. + * @throws IgniteCheckedException If unmarshalling failed. + */ + public void unmarshal(CacheObjectContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { + key.finishUnmarshal(ctx, clsLdr); if (val != null) - val.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); + val.finishUnmarshal(ctx, clsLdr); long remaining = expireTime; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index 93c5950..a967305 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -368,42 +368,6 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { } /** - * Adds preloading event. - * - * @param part Partition. - * @param type Event type. - * @param discoNode Discovery node. - * @param discoType Discovery event type. - * @param discoTs Discovery event timestamp. - */ - public void addPreloadEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) { - assert discoNode != null; - assert type > 0; - assert discoType > 0; - assert discoTs > 0; - - if (!cctx.events().isRecordable(type)) - LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type)); - - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(), - "Cache rebalancing event.", type, part, discoNode, discoType, discoTs)); - } - - /** - * Adds partition unload event. - * - * @param part Partition. - */ - public void addUnloadEvent(int part) { - if (!cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) - LT.warn(log, "Added event without checking if event is recordable: " + - U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED)); - - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(), - "Cache unloading event.", EVT_CACHE_REBALANCE_PART_UNLOADED, part, null, 0, 0)); - } - - /** * @param type Event type. * @return {@code True} if event is recordable. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java new file mode 100644 index 0000000..09c143b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Message related to particular cache group. + */ +public abstract class GridCacheGroupIdMessage extends GridCacheMessage { + /** Cache group ID. */ + @GridToStringInclude + protected int grpId; + + /** + * @return Cache group ID. + */ + public int groupId() { + return grpId; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return true; + } + + /** {@inheritDoc} */ + @Override public final int handlerId() { + return grpId; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 2: + if (!writer.writeInt("grpId", grpId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 2: + grpId = reader.readInt("grpId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridCacheGroupIdMessage.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheGroupIdMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java new file mode 100644 index 0000000..6c20bdd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Message related to particular cache. + */ +public abstract class GridCacheIdMessage extends GridCacheMessage { + /** Cache ID. */ + @GridToStringInclude + protected int cacheId; + + /** + * @return Cache ID. + */ + public int cacheId() { + return cacheId; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** + * @param cacheId Cache ID. + */ + public void cacheId(int cacheId) { + this.cacheId = cacheId; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 2: + if (!writer.writeInt("cacheId", cacheId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 2: + cacheId = reader.readInt("cacheId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridCacheIdMessage.class); + } + + /** {@inheritDoc} */ + @Override public int handlerId() { + return cacheId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheIdMessage.class, this); + } +}
