http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java deleted file mode 100644 index 274701f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.Map; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.util.lang.GridTriple; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -/** - * Empty cache map that will never store any entries. - */ -public class GridNoStorageCacheMap extends GridCacheConcurrentMap { - /** Empty triple. */ - private final GridTriple<GridCacheMapEntry> emptyTriple = - new GridTriple<>(null, null, null); - - /** - * @param ctx Cache context. - */ - public GridNoStorageCacheMap(GridCacheContext ctx) { - super(ctx, 0, null, 0.75f, 1); - } - - /** {@inheritDoc} */ - @Override public boolean isEmpty() { - return true; - } - - /** {@inheritDoc} */ - @Override public int size() { - return 0; - } - - /** {@inheritDoc} */ - @Override public int publicSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public boolean containsKey(Object key) { - return false; - } - - /** {@inheritDoc} */ - @Override public GridCacheMapEntry randomEntry() { - return null; - } - - /** {@inheritDoc} */ - @Override public GridCacheMapEntry getEntry(Object key) { - return null; - } - - /** {@inheritDoc} */ - @Override public GridCacheMapEntry putEntry(AffinityTopologyVersion topVer, - KeyCacheObject key, - @Nullable CacheObject val) - { - throw new AssertionError(); - } - - /** {@inheritDoc} */ - @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent( - AffinityTopologyVersion topVer, - KeyCacheObject key, - @Nullable CacheObject val, - boolean create) - { - if (create) { - GridCacheMapEntry entry = ctx.useOffheapEntry() ? - new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash(key.hashCode()), val) : - new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val); - - return new GridTriple<>(entry, null, null); - } - else - return emptyTriple; - } - - /** {@inheritDoc} */ - @Override public void putAll(Map<KeyCacheObject, CacheObject> m) { - throw new AssertionError(); - } - - /** {@inheritDoc} */ - @Override public boolean removeEntry(GridCacheEntryEx e) { - throw new AssertionError(); - } - - /** {@inheritDoc} */ - @Override public GridCacheMapEntry removeEntryIfObsolete(KeyCacheObject key) { - throw new AssertionError(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridNoStorageCacheMap.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index e9873b7..9561ad8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -490,7 +490,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { if (isNew && entry.markObsoleteIfEmpty(ver)) - cache.removeIfObsolete(key); + cache.removeEntry(entry); } else { cctx.addResult(locVals, http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 62c287a..fd59f48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -414,7 +414,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeIfObsolete(key); + colocated.removeEntry(entry); } else { if (!skipVals && cctx.config().isStatisticsEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d28aaaa..ee909e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -175,9 +175,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory() { - /** {@inheritDoc} */ + @Override protected GridCacheMapEntryFactory entryFactory() { + return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, @@ -190,7 +189,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val); } - }); + }; + } + + /** {@inheritDoc} */ + @Override protected void init() { + super.init(); updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @@ -330,16 +334,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean skipStore = opCtx != null && opCtx.skipStore(); - return getAsync0(ctx.toCacheKeyObject(key), - !ctx.config().isReadFromBackup(), - subjId, - taskName, - deserializeBinary, - expiryPlc, - false, - skipStore, - true, - needVer).get(); + try { + return getAsync0(ctx.toCacheKeyObject(key, true), + !ctx.config().isReadFromBackup(), + subjId, + taskName, + deserializeBinary, + expiryPlc, + false, + skipStore, + true, + needVer).get(); + } + catch (IgniteException e) { + if (e.getCause(IgniteCheckedException.class) != null) + throw e.getCause(IgniteCheckedException.class); + else + throw e; + } } /** {@inheritDoc} */ @@ -369,7 +381,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<V>>() { @Override public IgniteInternalFuture<V> apply() { - return getAsync0(ctx.toCacheKeyObject(key), + return getAsync0(ctx.toCacheKeyObject(key, true), forcePrimary, subjId0, taskName, @@ -415,7 +427,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { @Override public IgniteInternalFuture<Map<K, V>> apply() { - return getAllAsync0(ctx.cacheKeysView(keys), + return getAllAsync0(ctx.cacheKeysView(keys, true), forcePrimary, subjId0, taskName, @@ -808,13 +820,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, TRANSFORM); - return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { - @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException { - Map<Object, EntryProcessorResult> resMap = (Map)fut.get(); + return resFut.chain( + new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { + @Override public Map<K, EntryProcessorResult<T>> applyx( + IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut + ) throws IgniteCheckedException { + Map<Object, EntryProcessorResult> resMap = (Map)fut.get(); - return ctx.unwrapInvokeResult(resMap, keepBinary); - } - }); + return ctx.unwrapInvokeResult(resMap, keepBinary); + } + }); } /** {@inheritDoc} */ @@ -1372,7 +1387,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion obsoleteVer = context().versions().next(); if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeIfObsolete(key); + removeEntry(entry); success = false; } @@ -1952,9 +1967,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, req.keepBinary()); - Object val = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(ctx, entry.key(), - old, req.keepBinary()), - updated.value(ctx.cacheObjectContext(), false)); + Object val = ctx.config().getInterceptor().onBeforePut( + new CacheLazyEntry( + ctx, + entry.key(), + old, + req.keepBinary()), + updated.value( + ctx.cacheObjectContext(), + false)); if (val == null) continue; @@ -3329,7 +3350,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { respVers.add(ver); - if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) + if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) snd = true; } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index b5e2835..6b050b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -681,6 +681,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid if (nearEntryProcessors == null) nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); } + + if (partIds != null && !partIds.isEmpty()) { + assert partIds.size() == keys.size(); + + for (int i = 0; i < keys.size(); i++) + keys.get(i).partition(partIds.get(i)); + } } /** {@inheritDoc} */ @@ -835,6 +842,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); + case 25: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + } return true; @@ -1031,6 +1044,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); + case 25: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class); @@ -1056,7 +1076,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 26; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index edebd8c..dfdf1a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -854,7 +854,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (val == null && op != GridCacheOperation.DELETE) continue; - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key, true); if (remapKeys != null && !remapKeys.contains(cacheKey)) continue; @@ -969,12 +969,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (val == null && op != GridCacheOperation.DELETE) throw new NullPointerException("Null value."); - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key, true); if (op != TRANSFORM) val = cctx.toCacheObject(val); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + ClusterNode primary = cctx.affinity().primary(cacheKey.partition(), topVer); if (primary == null) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 375c02f..e847c7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -99,6 +99,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @GridDirectCollection(CacheObject.class) private List<CacheObject> vals; + /** Partitions of keys. */ + @GridDirectCollection(int.class) + private List<Integer> partIds; + /** Entry processors. */ @GridDirectTransient private List<EntryProcessor<Object, Object, Object>> entryProcessors; @@ -246,6 +250,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri initSize = Math.min(maxEntryCnt, 10); keys = new ArrayList<>(initSize); + + partIds = new ArrayList<>(initSize); } /** {@inheritDoc} */ @@ -384,12 +390,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri if (op == TRANSFORM) { assert val instanceof EntryProcessor : val; - entryProcessor = (EntryProcessor<Object, Object, Object>) val; + entryProcessor = (EntryProcessor<Object, Object, Object>)val; } assert val != null || op == DELETE; keys.add(key); + partIds.add(key.partition()); if (entryProcessor != null) { if (entryProcessors == null) @@ -652,6 +659,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri if (expiryPlcBytes != null && expiryPlc == null) expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (partIds != null && !partIds.isEmpty()) { + assert partIds.size() == keys.size(); + + for (int i = 0; i < keys.size(); i++) + keys.get(i).partition(partIds.get(i)); + } } /** {@inheritDoc} */ @@ -812,6 +826,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); + case 26: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); } return true; @@ -1020,6 +1039,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); + case 26: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridNearAtomicUpdateRequest.class); @@ -1048,7 +1075,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 26; + return 27; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 9c699fa..0ae434a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -116,9 +116,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory() { - /** {@inheritDoc} */ + @Override protected GridCacheMapEntryFactory entryFactory() { + return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, @@ -131,7 +130,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val); } - }); + }; } /** {@inheritDoc} */ @@ -245,8 +244,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } AffinityTopologyVersion topVer = tx == null ? - (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : - tx.topologyVersion(); + (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); subjId = ctx.subjectIdPerCall(subjId, opCtx); @@ -526,7 +525,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridCacheVersion obsoleteVer = context().versions().next(); if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeIfObsolete(key); + removeEntry(entry); success = false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index dc225cf..821d2e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -675,7 +675,7 @@ public class GridDhtPartitionDemander { "value, will ignore rebalance entries)"); if (cached.markObsoleteIfEmpty(null)) - cached.context().cache().removeIfObsolete(cached.key()); + cached.context().cache().removeEntry(cached); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index d301ba9..b082c47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -32,8 +32,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.lang.GridCloseableIterator; @@ -298,8 +298,8 @@ class GridDhtPartitionSupplier { phase = SupplyContextPhase.ONHEAP; if (phase == SupplyContextPhase.ONHEAP) { - Iterator<GridDhtCacheEntry> entIt = sctx != null ? - (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator(); + Iterator<GridCacheMapEntry> entIt = sctx != null ? + (Iterator<GridCacheMapEntry>)sctx.entryIt : loc.allEntries().iterator(); while (entIt.hasNext()) { if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { @@ -807,7 +807,7 @@ class GridDhtPartitionSupplier { boolean partMissing = false; - for (GridCacheEntryEx e : loc.entries()) { + for (GridCacheEntryEx e : loc.allEntries()) { if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, so we send '-1' partition and move on. s.missed(part); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 4b876b4..7971173 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -39,12 +39,10 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable; -import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheEntrySet; -import org.apache.ignite.internal.processors.cache.GridCacheKeySet; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCachePreloader; @@ -54,16 +52,14 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteBiPredicate; import org.jetbrains.annotations.NotNull; @@ -94,9 +90,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory() { - /** {@inheritDoc} */ + @Override protected GridCacheMapEntryFactory entryFactory() { + return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, @@ -111,7 +106,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda return new GridNearCacheEntry(ctx, key, hash, val); } - }); + }; } /** @@ -121,10 +116,10 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public void onReconnected() { - map = new GridCacheConcurrentMap( + map = new GridCacheConcurrentMapImpl( ctx, - ctx.config().getNearConfiguration().getNearStartSize(), - map.getEntryFactory()); + entryFactory(), + ctx.config().getNearConfiguration().getNearStartSize()); } /** {@inheritDoc} */ @@ -138,7 +133,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCacheEntryEx entryEx(KeyCacheObject key, boolean touch) { + @Override public GridCacheMapEntry entryEx(KeyCacheObject key, boolean touch) { GridNearCacheEntry entry = null; while (true) { @@ -157,7 +152,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) { + @Override public GridCacheMapEntry entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) { GridNearCacheEntry entry = null; while (true) { @@ -311,7 +306,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public long sizeLong() { - return nearEntries().size() + dht().sizeLong(); + return nearEntries().size() + dht().size(); } /** {@inheritDoc} */ @@ -339,15 +334,23 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Override public boolean apply(GridCacheEntryEx entry) { GridNearCacheEntry nearEntry = (GridNearCacheEntry)entry; - return nearEntry.valid(topVer); + return !nearEntry.deleted() && nearEntry.visitable(CU.empty0()) && nearEntry.valid(topVer); } }); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySet( - @Nullable CacheEntryPredicate... filter) { - return new EntrySet(super.entrySet(filter), dht().entrySet(filter)); + @Override public Set<Cache.Entry<K, V>> entrySet(@Nullable final CacheEntryPredicate... filter) { + CacheEntryPredicate p = new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx ex) { + if (ex instanceof GridCacheMapEntry) + return ((GridCacheMapEntry)ex).visitable(filter); + else + return !ex.deleted() && F.isAll(ex, filter); + } + }; + + return new EntrySet(super.entrySet(p), dht().entrySet(p)); } /** {@inheritDoc} */ @@ -355,49 +358,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda return dht().entrySet(part); } - /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> primaryEntrySet() { - final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - - Collection<Cache.Entry<K, V>> entries = - F.flatCollections( - F.viewReadOnly( - dht().topology().currentLocalPartitions(), - new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() { - @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) { - Collection<GridDhtCacheEntry> entries0 = p.entries(); - - return F.viewReadOnly( - entries0, - new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() { - @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) { - return e.wrapLazyValue(); - } - }, - new P1<GridDhtCacheEntry>() { - @Override public boolean apply(GridDhtCacheEntry e) { - return !e.obsoleteOrDeleted(); - } - }); - } - }, - new P1<GridDhtLocalPartition>() { - @Override public boolean apply(GridDhtLocalPartition p) { - return p.primary(topVer); - } - })); - - return new GridCacheEntrySet<>(ctx, entries, null); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet() { - return new GridCacheKeySet<>(ctx, entrySet(), null); - } - - /** {@inheritDoc} */ - @Override public Set<K> primaryKeySet() { - return new GridCacheKeySet<>(ctx, primaryEntrySet(), null); + /** + * @return Keys for near cache only. + */ + public Set<K> nearKeySet() { + return super.keySet(); } /** {@inheritDoc} */ @@ -500,7 +465,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, boolean readers) { + @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, + boolean readers) { assert configuration().getNearConfiguration() != null; if (ctx.affinityNode()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 1b30afb..7d29381 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -619,7 +619,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap // Entry was not in memory or in swap, so we remove it from cache. if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver)) - dht.removeIfObsolete(key); + dht.removeEntry(dhtEntry); } if (v != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 0c2451e..fa7f367 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -69,7 +71,11 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** */ @GridDirectCollection(KeyCacheObject.class) - private Collection<KeyCacheObject> keys; + private List<KeyCacheObject> keys; + + /** Partition IDs. */ + @GridDirectCollection(int.class) + private List<Integer> partIds; /** */ @GridDirectCollection(boolean.class) @@ -140,8 +146,17 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep this.futId = futId; this.miniId = miniId; this.ver = ver; - this.keys = keys.keySet(); - this.flags = keys.values(); + + this.keys = new ArrayList<>(keys.size()); + flags = new ArrayList<>(keys.size()); + partIds = new ArrayList<>(keys.size()); + + for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) { + this.keys.add(entry.getKey()); + flags.add(entry.getValue()); + partIds.add(entry.getKey().partition()); + } + this.readThrough = readThrough; this.topVer = topVer; this.subjId = subjId; @@ -269,6 +284,13 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep while (keysIt.hasNext()) keyMap.put(keysIt.next(), flagsIt.next()); } + + if (partIds != null && !partIds.isEmpty()) { + assert partIds.size() == keys.size(); + + for (int i = 0; i < keys.size(); i++) + keys.get(i).partition(partIds.get(i)); + } } /** {@inheritDoc} */ @@ -363,6 +385,12 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep writer.incrementState(); + case 15: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + } return true; @@ -475,6 +503,14 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); + case 15: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridNearGetRequest.class); @@ -485,10 +521,9 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep return 49; } - /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 16; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 2a6b0a8..2e8cd6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -445,6 +445,12 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); + case 34: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + } return true; @@ -573,6 +579,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); + case 34: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridNearLockRequest.class); @@ -585,7 +599,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index aa96720..62d37af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -62,6 +62,9 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa /** */ private KeyCacheObject key; + /** Partition ID. */ + private int partId = -1; + /** Flags. */ private byte flags; @@ -118,6 +121,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa this.cacheId = cacheId; this.futId = futId; this.key = key; + this.partId = key.partition(); this.topVer = topVer; this.subjId = subjId; this.taskNameHash = taskNameHash; @@ -233,6 +237,8 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa assert key != null; + key.partition(partId); + GridCacheContext cctx = ctx.cacheContext(cacheId); key.finishUnmarshal(cctx.cacheObjectContext(), ldr); @@ -305,6 +311,14 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa reader.incrementState(); + case 10: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridNearSingleGetRequest.class); @@ -367,6 +381,12 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa writer.incrementState(); + case 10: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + } return true; @@ -384,7 +404,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java index 7652a4a..be78868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -61,6 +62,14 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest { writer.onHeaderWritten(); } + switch (writer.state()) { + case 8: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + } + return true; } @@ -74,6 +83,16 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest { if (!super.readFrom(buf, reader)) return false; + switch (reader.state()) { + case 8: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + return reader.afterMessageRead(GridNearUnlockRequest.class); } @@ -84,7 +103,7 @@ public class GridNearUnlockRequest extends GridDistributedUnlockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 4ce1f36..16a35d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -80,9 +80,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory() { - /** {@inheritDoc} */ + @Override protected GridCacheMapEntryFactory entryFactory() { + return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, @@ -92,7 +91,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { ) { return new GridLocalCacheEntry(ctx, key, hash, val); } - }); + }; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 3e0e2c2..cb6152d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -45,19 +45,16 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCachePreloader; import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.local.GridLocalCacheEntry; +import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; @@ -85,7 +82,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD /** * Non-transactional local cache. */ -public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { +public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -103,27 +100,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param ctx Cache context. */ public GridLocalAtomicCache(GridCacheContext<K, V> ctx) { - super(ctx, ctx.config().getStartSize()); + super(ctx); preldr = new GridCachePreloaderAdapter(ctx); } /** {@inheritDoc} */ - @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val - ) { - return new GridLocalCacheEntry(ctx, key, hash, val); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean isLocal() { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index e08f9b0..87b8e07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -25,7 +25,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 17bed0b..b8447f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -40,7 +40,6 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index e75ce91..4d8139f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -98,6 +98,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Cache ID. */ private int cacheId; + /** Partition ID. */ + private int partId = -1; + /** Transient tx key. */ @GridDirectTransient private IgniteTxKey txKey; @@ -251,11 +254,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { keepBinary(keepBinary); key = entry.key(); + partId = entry.key().partition(); cacheId = entry.context().cacheId(); } - /** + /** * This constructor is meant for local transactions. * * @param ctx Cache registry. @@ -303,6 +307,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { addEntryProcessor(entryProcessor, invokeArgs); key = entry.key(); + partId = entry.key().partition(); cacheId = entry.context().cacheId(); } @@ -336,6 +341,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { IgniteTxEntry cp = new IgniteTxEntry(); cp.key = key; + cp.partId = partId; cp.cacheId = cacheId; cp.ctx = ctx; @@ -476,7 +482,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * * @param skipStore Skip store flag. */ - public void skipStore(boolean skipStore){ + public void skipStore(boolean skipStore) { setFlag(skipStore, SKIP_STORE_FLAG_MASK); } @@ -853,7 +859,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param clsLdr Class loader. * @throws IgniteCheckedException If un-marshalling failed. */ - public void unmarshal(GridCacheSharedContext<?, ?> ctx, boolean near, ClassLoader clsLdr) throws IgniteCheckedException { + public void unmarshal(GridCacheSharedContext<?, ?> ctx, boolean near, + ClassLoader clsLdr) throws IgniteCheckedException { if (this.ctx == null) { GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(cacheId); @@ -884,6 +891,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { key.finishUnmarshal(context().cacheObjectContext(), clsLdr); + key.partition(partId); + val.unmarshal(this.ctx, clsLdr); if (expiryPlcBytes != null && expiryPlc == null) @@ -933,7 +942,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** * @param ver Entry version. */ - public void entryReadVersion(GridCacheVersion ver) { + public void entryReadVersion(GridCacheVersion ver) { assert this.serReadVer == null; assert ver != null; @@ -1037,6 +1046,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); + case 12: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + } return true; @@ -1146,6 +1161,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); + case 12: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(IgniteTxEntry.class); @@ -1158,7 +1181,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 13; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index cadf1a9..686f308 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -136,6 +136,16 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { /** * @param ctx Cache context. + * @param obj Key value. + * @param userObj If {@code true} then given object is object provided by user and should be copied + * before stored in cache. + * @param partition ID of partition this key belongs to. + * @return Cache key object. + */ + public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj, int partition); + + /** + * @param ctx Cache context. * @param obj Object. * @param userObj If {@code true} then given object is object provided by user and should be copied * before stored in cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 9a70911..9b47e59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -102,18 +102,24 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) - throws IgniteCheckedException - { + throws IgniteCheckedException { return ctx.kernalContext().cache().context().marshaller().unmarshal(bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config())); } /** {@inheritDoc} */ @Override @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) { - if (obj instanceof KeyCacheObject) + return toCacheKeyObject(ctx, obj, userObj, -1); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj, int partition) { + if (obj instanceof KeyCacheObject) { + ((KeyCacheObject)obj).partition(partition); return (KeyCacheObject)obj; + } - return toCacheKeyObject0(obj, userObj); + return toCacheKeyObject0(obj, userObj, partition); } /** @@ -123,17 +129,16 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme * @return Key cache object. */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - protected KeyCacheObject toCacheKeyObject0(Object obj, boolean userObj) { + protected KeyCacheObject toCacheKeyObject0(Object obj, boolean userObj, int partititon) { if (!userObj) - return new KeyCacheObjectImpl(obj, null); + return new KeyCacheObjectImpl(obj, null, partititon); - return new UserKeyCacheObjectImpl(obj); + return new UserKeyCacheObjectImpl(obj, partititon); } /** {@inheritDoc} */ @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp) - throws IgniteCheckedException - { + throws IgniteCheckedException { assert valPtr != 0; int size = GridUnsafe.getInt(valPtr); @@ -172,8 +177,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, - boolean userObj) - { + boolean userObj) { if (obj == null || obj instanceof CacheObject) return (CacheObject)obj; @@ -241,7 +245,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme return 0; } - /** {@inheritDoc} */ @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws IgniteException { return obj; @@ -290,7 +293,14 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme * @param key Key. */ UserKeyCacheObjectImpl(Object key) { - super(key, null); + this(key, -1); + } + + /** + * @param key Key. + */ + UserKeyCacheObjectImpl(Object key, int partition) { + super(key, null, partition); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index b6cbf3b..448dd8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -36,9 +36,9 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; /** * Cache atomic reference implementation. http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionedReadOnlySet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionedReadOnlySet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionedReadOnlySet.java new file mode 100644 index 0000000..5ab75c1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionedReadOnlySet.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.util; + +import java.util.AbstractSet; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Read-only wrapper over multiple sets. + */ +public class PartitionedReadOnlySet<T> extends AbstractSet<T> { + /** */ + private final Collection<Set<T>> sets; + + /** + * Constructor. + * @param sets Internal sets. + */ + public PartitionedReadOnlySet(Collection<Set<T>> sets) { + this.sets = sets; + } + + /** {@inheritDoc} */ + @Override public Iterator<T> iterator() { + Collection<Iterator<T>> iterators = new ArrayList<>(sets.size()); + + for (Set<T> set : sets) + iterators.add(set.iterator()); + + return F.flatIterators(iterators); + } + + /** {@inheritDoc} */ + @Override public int size() { + int size = 0; + + for (Set<T> set : sets) + size += set.size(); + + return size; + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + for (Set<T> set : sets) + if (set.contains(o)) + return true; + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java index 96445d8..12940e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java @@ -86,6 +86,16 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock { } /** + * Queries if the write lock is held by the current thread. + * + * @return {@code true} if the current thread holds the write lock and + * {@code false} otherwise + */ + public boolean isWriteLockedByCurrentThread() { + return locks[locks.length - 1].isWriteLockedByCurrentThread(); + } + + /** * Read lock. */ @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 8234dc4..ab31625 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -287,7 +287,6 @@ public class GridFunc { } }; - /** */ private static final IgniteClosure CACHE_ENTRY_VAL_GET = new IgniteClosure() { @SuppressWarnings({"unchecked"}) @@ -4665,4 +4664,4 @@ public class GridFunc { public static IgnitePredicate<IgniteInternalFuture<?>> unfinishedFutures() { return UNFINISHED_FUTURE; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java index b5151c7..1be7af8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java @@ -233,9 +233,9 @@ public class VisorCache implements Serializable { protected void estimateMemorySize(IgniteEx ignite, GridCacheAdapter ca, int sample) throws IgniteCheckedException { int size = ca.size(); - Set<GridCacheEntryEx> set = ca.context().isNear() - ? ((GridNearCacheAdapter)ca).dht().map().entries0() - : ca.map().entries0(); + Iterable<GridCacheEntryEx> set = ca.context().isNear() + ? ((GridNearCacheAdapter)ca).dht().entries() + : ca.entries(); long memSz = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java index ba43454..bcdab3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java @@ -108,7 +108,7 @@ public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest try { final Ignite ignite1 = startGrid(1); - IgniteCache<Integer, Object> cache1 = ignite1.cache(null); + final IgniteCache<Integer, Object> cache1 = ignite1.cache(null); for (int i = 0; i < 5000; i++) cache1.put(i, VALUE + i); @@ -129,7 +129,7 @@ public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest info("Started evicting..."); for (int i = 0; i < 3000 && !done.get(); i++) { - Cache.Entry<Integer, Object> entry = randomEntry(ignite1); + Cache.Entry<Integer, Object> entry = cache1.getEntry(i); if (entry != null) ignite1.cache(null).localEvict(Collections.<Object>singleton(entry.getKey())); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java index 5e746c3..7b4a5ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java @@ -112,7 +112,7 @@ public class GridCacheTtlManagerSelfTest extends GridCommonAbstractTest { assertNull(g.cache(null).get(key)); if (!g.internalCache().context().deferredDelete()) - assertNull(g.internalCache().map().getEntry(key)); + assertNull(g.internalCache().map().getEntry(g.internalCache().context().toCacheKeyObject(key))); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java index dc6a503..aec448b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.F; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; @@ -432,7 +433,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach for (Object ignored : keys) itSize++; - int valsSize = cache.values().size(); + int valsSize = F.size(cache.values().iterator()); info("cacheSize=" + cacheSize + ", keysSize=" + keySetSize + ", valsSize=" + valsSize + ", itSize=" + itSize + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java index b661b52..a88ad15 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java @@ -28,8 +28,6 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridNoStorageCacheMap; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -313,14 +311,6 @@ public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTes assertEquals(near, cache.context().isNear()); - if (near) - cache = ((GridNearCacheAdapter)cache).dht(); - - if (srv) - assertSame(GridCacheConcurrentMap.class, cache.map().getClass()); - else - assertSame(GridNoStorageCacheMap.class, cache.map().getClass()); - ClusterNode node = ((IgniteKernal)ignite).localNode(); for (Ignite ignite0 : Ignition.allGrids()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java index 9c8f7f8..b65b441 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -369,7 +369,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb GridCacheAdapter cache = grid.internalCache(null); - GridCacheMapEntry entry = cache.map().getEntry(key); + GridCacheMapEntry entry = cache.map().getEntry(cache.context().toCacheKeyObject(key)); log.info("Entry: " + entry); @@ -378,11 +378,11 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore, entry.hasValue()); assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null, - entry.rawGetOrUnmarshal(false)); + entry.rawGetOrUnmarshal(false).value(cache.ctx.cacheObjectContext(), false)); } if (cache.isNear()) { - entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key); + entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(cache.context().toCacheKeyObject(key)); log.info("Dht entry: " + entry); @@ -391,7 +391,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore, entry.hasValue()); assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null, - entry.rawGetOrUnmarshal(false)); + entry.rawGetOrUnmarshal(false).value(cache.ctx.cacheObjectContext(), false)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java index f776146..be20b68 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java @@ -145,7 +145,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA for (int i = 0; i < gridCount(); i++) { GridCacheAdapter<Object, Object> c = ((IgniteKernal)grid(i)).internalCache(); - for (GridCacheEntryEx e : c.map().entries0()) { + for (GridCacheEntryEx e : c.map().entries()) { Object key = e.key().value(c.context().cacheObjectContext(), false); Object val = CU.value(e.rawGet(), c.context(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java index 2504105..45b4b9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java @@ -364,7 +364,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte GridCacheAffinityManager aff = cctx.affinity(); for (int i = 0; i < gridCount(); i++) { - for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache(cctx.name()).map().allEntries0()) { + for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache(cctx.name()).allEntries()) { if (aff.primary(grid(i).localNode(), e.key(), AffinityTopologyVersion.NONE) && e.key().value(cctx.cacheObjectContext(), false) instanceof GridCacheQueueHeaderKey) return i; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java index d67e725..1eac282 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java @@ -31,7 +31,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.datastructures.GridCacheQueueHeaderKey; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.testframework.GridTestUtils; @@ -191,7 +191,7 @@ public class GridCacheQueueCleanupSelfTest extends IgniteCollectionAbstractTest GridCacheAdapter<Object, Object> cache = ((IgniteKernal)grid(i)).context().cache().internalCache(queueCacheName); - Iterator<GridCacheEntryEx> entries = cache.map().allEntries0().iterator(); + Iterator<GridCacheMapEntry> entries = cache.map().entries().iterator(); while (entries.hasNext()) { cnt++;
