2224
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01135066 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01135066 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01135066 Branch: refs/heads/sql-store-cmp Commit: 01135066d54df254a0b23afbbffca2ed103e3a8c Parents: 62502b2 Author: Anton Vinogradov <a...@apache.org> Authored: Tue Feb 2 15:25:05 2016 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Tue Feb 2 15:25:05 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 45 +- .../processors/cache/CacheEntryImplEx.java | 14 +- .../processors/cache/GridCacheAdapter.java | 297 +++++-- .../processors/cache/GridCacheContext.java | 33 +- .../processors/cache/GridCacheMapEntry.java | 2 +- .../processors/cache/GridCacheProxyImpl.java | 51 ++ .../processors/cache/IgniteCacheProxy.java | 51 ++ .../processors/cache/IgniteInternalCache.java | 85 ++ .../dht/CacheDistributedGetFutureAdapter.java | 15 - .../distributed/dht/GridDhtCacheAdapter.java | 7 +- .../cache/distributed/dht/GridDhtGetFuture.java | 6 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 38 +- .../dht/GridPartitionedSingleGetFuture.java | 17 +- .../dht/atomic/GridDhtAtomicCache.java | 82 +- .../dht/colocated/GridDhtColocatedCache.java | 42 +- .../distributed/near/GridNearAtomicCache.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 6 +- .../distributed/near/GridNearCacheEntry.java | 3 +- .../distributed/near/GridNearGetFuture.java | 45 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 4 + .../GridNearPessimisticTxPrepareFuture.java | 2 + .../near/GridNearTransactionalCache.java | 9 +- .../local/atomic/GridLocalAtomicCache.java | 97 ++- .../cache/transactions/IgniteTxEntry.java | 32 +- .../transactions/IgniteTxLocalAdapter.java | 196 +++-- .../cache/transactions/IgniteTxLocalEx.java | 3 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../cache/CacheGetEntryAbstractTest.java | 803 +++++++++++++++++++ ...GetEntryOptimisticReadCommittedSeltTest.java | 36 + ...etEntryOptimisticRepeatableReadSeltTest.java | 36 + ...eGetEntryOptimisticSerializableSeltTest.java | 36 + ...etEntryPessimisticReadCommittedSeltTest.java | 36 + ...tEntryPessimisticRepeatableReadSeltTest.java | 36 + ...GetEntryPessimisticSerializableSeltTest.java | 36 + .../cache/CacheReadThroughRestartSelfTest.java | 43 +- .../CacheSerializableTransactionsTest.java | 142 +++- .../cache/GridCacheAbstractFullApiSelfTest.java | 141 ++++ .../GridCacheInterceptorAbstractSelfTest.java | 172 +++- ...GridCacheDhtEvictionNearReadersSelfTest.java | 2 +- .../multijvm/IgniteCacheProcessProxy.java | 59 +- .../testsuites/IgniteCacheTestSuite4.java | 12 + .../config/benchmark-multicast.properties | 7 + .../IgniteGetEntriesPutAllTxBenchmark.java | 73 ++ .../cache/IgnitePutGetEntryBenchmark.java | 47 ++ .../cache/IgnitePutGetEntryTxBenchmark.java | 73 ++ 47 files changed, 2644 insertions(+), 342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 886dca6..a791e38 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -31,10 +31,12 @@ import javax.cache.CacheException; import javax.cache.configuration.Configuration; import javax.cache.event.CacheEntryRemovedListener; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheWriter; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; @@ -390,18 +392,59 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * <code>null</code> value for a key. */ @IgniteAsyncSupported - <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); /** {@inheritDoc} */ @IgniteAsyncSupported @Override public V get(K key); + /** + * Gets an entry from the cache. + * <p> + * If the cache is configured to use read-through, and get would return null + * because the entry is missing from the cache, the Cache's {@link CacheLoader} + * is called in an attempt to load the entry. + * + * @param key the key whose associated value is to be returned + * @return the element, or null, if it does not exist. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws NullPointerException if the key is null + * @throws CacheException if there is a problem fetching the value + * @throws ClassCastException if the implementation is configured to perform + * runtime-type-checking, and the key or value types are incompatible with those that have been + * configured for the {@link Cache} + */ + @IgniteAsyncSupported + public CacheEntry<K, V> getEntry(K key); + /** {@inheritDoc} */ @IgniteAsyncSupported @Override public Map<K, V> getAll(Set<? extends K> keys); /** + * Gets a collection of entries from the {@link Cache}. + * <p> + * If the cache is configured read-through, and a get for a key would + * return null because an entry is missing from the cache, the Cache's + * {@link CacheLoader} is called in an attempt to load the entry. If an + * entry cannot be loaded for a given key, the key will not be present in + * the returned Collection. + * + * @param keys The keys whose associated values are to be returned. + * @return A collection of entries that were found for the given keys. Entries not found + * in the cache are not in the returned collection. + * @throws NullPointerException if keys is null or if keys contains a null + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem fetching the values + * @throws ClassCastException if the implementation is configured to perform + * runtime-type-checking, and the key or value types are incompatible with those that have been + * configured for the {@link Cache} + */ + @IgniteAsyncSupported + public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys); + + /** * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries * and will not lock any keys if pessimistic transaction is started by thread. * http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java index 1c7111a..af926c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java @@ -21,9 +21,13 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED; + /** * */ @@ -54,6 +58,14 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach /** {@inheritDoc} */ public GridCacheVersion version() { + if (ver == GET_ENTRY_INVALID_VER_AFTER_GET) { + throw new IgniteException("Impossible to get entry version after " + + "get() inside OPTIMISTIC REPEATABLE_READ transaction. Use only getEntry() or getEntries() inside " + + "OPTIMISTIC REPEATABLE_READ transaction to solve this problem."); + } + else if (ver == GET_ENTRY_INVALID_VER_UPDATED) + throw new IgniteException("Impossible to get version for entry updated in transaction."); + return ver; } @@ -81,7 +93,7 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach String res = "CacheEntry [key=" + getKey() + ", val=" + getValue(); - if (ver != null) { + if (ver != null && ver != GET_ENTRY_INVALID_VER_AFTER_GET && ver != GET_ENTRY_INVALID_VER_UPDATED) { res += ", topVer=" + ver.topologyVersion() + ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9fd65e5..69abc54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -52,6 +52,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; @@ -607,7 +608,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*task name*/null, /*deserialize binary*/false, /*skip values*/true, - /*can remap*/true + /*can remap*/true, + false ); } @@ -633,7 +635,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*task name*/null, /*deserialize binary*/false, /*skip values*/true, - /*can remap*/true + /*can remap*/true, + false ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { Map<K, V> kvMap = fut.get(); @@ -1296,7 +1299,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, /*deserialize cache objects*/true, /*skip values*/false, - /*can remap*/true + /*can remap*/true, + false ).get().get(key); } @@ -1312,7 +1316,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, true, false, - /*can remap*/true + /*can remap*/true, + false ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { return e.get().get(key); @@ -1332,7 +1337,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, /*deserialize cache objects*/true, /*skip values*/false, - /*can remap*/false + /*can remap*/false, + false ).get().get(key); } @@ -1352,7 +1358,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, !ctx.keepBinary(), /*skip values*/false, - /*can remap*/true); + /*can remap*/true, + false); } /** @@ -1372,7 +1379,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V long start = statsEnabled ? System.nanoTime() : 0L; - V val = get(key, !ctx.keepBinary()); + V val = get(key, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) val = (V)ctx.config().getInterceptor().onGet(key, val); @@ -1384,6 +1391,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException { + A.notNull(key, "key"); + + boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + long start = statsEnabled ? System.nanoTime() : 0L; + + T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !ctx.keepBinary(), true); + + CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()): null; + + if (ctx.config().getInterceptor() != null) { + V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + + val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null) : null; + } + + if (statsEnabled) + metrics0().addGetTimeNanos(System.nanoTime() - start); + + return val; + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> getAsync(final K key) { A.notNull(key, "key"); @@ -1391,7 +1422,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final long start = statsEnabled ? System.nanoTime() : 0L; - IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary()); + IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { @@ -1407,6 +1438,42 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(final K key) { + A.notNull(key, "key"); + + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteInternalFuture<T2<V, GridCacheVersion>> fut = + (IgniteInternalFuture<T2<V, GridCacheVersion>>)getAsync(key, !ctx.keepBinary(), true); + + final boolean intercept = ctx.config().getInterceptor() != null; + + IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain( + new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() { + @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f) + throws IgniteCheckedException { + T2<V, GridCacheVersion> t = f.get(); + + CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; + if (intercept) { + V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + + return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null); + } + else + return val; + } + }); + + if (statsEnabled) + fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start)); + + return fr; + } + + /** {@inheritDoc} */ @Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { A.notNull(keys, "keys"); @@ -1414,7 +1481,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V long start = statsEnabled ? System.nanoTime() : 0L; - Map<K, V> map = getAll(keys, !ctx.keepBinary()); + Map<K, V> map = getAll(keys, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) map = interceptGet(keys, map); @@ -1426,6 +1493,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys) + throws IgniteCheckedException { + A.notNull(keys, "keys"); + + boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + long start = statsEnabled ? System.nanoTime() : 0L; + + Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true); + + Collection<CacheEntry<K, V>> res = new HashSet<>(); + + if (ctx.config().getInterceptor() != null) + res = interceptGetEntries(keys, map); + else + for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) + res.add(new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2())); + + if (statsEnabled) + metrics0().addGetTimeNanos(System.nanoTime() - start); + + return res; + } + + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) { A.notNull(keys, "keys"); @@ -1433,7 +1526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final long start = statsEnabled ? System.nanoTime() : 0L; - IgniteInternalFuture<Map<K, V>> fut = getAllAsync(keys, !ctx.keepBinary()); + IgniteInternalFuture<Map<K, V>> fut = getAllAsync(keys, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) return fut.chain(new CX1<IgniteInternalFuture<Map<K, V>>, Map<K, V>>() { @@ -1448,6 +1541,45 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return fut; } + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync( + @Nullable final Collection<? extends K> keys) { + A.notNull(keys, "keys"); + + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut = + (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>) + ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true)); + + final boolean intercept = ctx.config().getInterceptor() != null; + + IgniteInternalFuture<Collection<CacheEntry<K, V>>> rf = + fut.chain(new CX1<IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>, Collection<CacheEntry<K, V>>>() { + @Override public Collection<CacheEntry<K, V>> applyx( + IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> f) throws IgniteCheckedException { + if (intercept) + return interceptGetEntries(keys, f.get()); + else { + Map<K, CacheEntry<K, V>> res = U.newHashMap(f.get().size()); + + for (Map.Entry<K, T2<V, GridCacheVersion>> e : f.get().entrySet()) + res.put(e.getKey(), + new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2())); + + return res.values(); + } + } + }); + + if (statsEnabled) + fut.listen(new UpdateGetTimeStatClosure<Map<K, T2<V, GridCacheVersion>>>(metrics0(), start)); + + return rf; + } + /** * Applies cache interceptor on result of 'get' operation. * @@ -1490,6 +1622,53 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Applies cache interceptor on result of 'getEntries' operation. + * + * @param keys All requested keys. + * @param map Result map. + * @return Map with values returned by cache interceptor.. + */ + @SuppressWarnings("IfMayBeConditional") + private Collection<CacheEntry<K, V>> interceptGetEntries( + @Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) { + Map<K, CacheEntry<K, V>> res; + + if (F.isEmpty(keys)) { + assert map.isEmpty(); + + return Collections.emptySet(); + } + + res = U.newHashMap(keys.size()); + + CacheInterceptor<K, V> interceptor = cacheCfg.getInterceptor(); + + assert interceptor != null; + + for (Map.Entry<K, T2<V, GridCacheVersion>> e : map.entrySet()) { + V val = interceptor.onGet(e.getKey(), e.getValue().get1()); + + if (val != null) + res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().get2())); + } + + if (map.size() != keys.size()) { // Not all requested keys were in cache. + for (K key : keys) { + if (key != null) { + if (!map.containsKey(key)) { + V val = interceptor.onGet(key, null); + + if (val != null) + res.put(key, new CacheEntryImplEx<>(key, val, null)); + } + } + } + } + + return res.values(); + } + + /** * @param key Key. * @param forcePrimary Force primary. * @param skipTx Skip tx. @@ -1498,6 +1677,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param deserializeBinary Deserialize binary. * @param skipVals Skip values. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Future for the get operation. */ protected IgniteInternalFuture<V> getAsync( @@ -1508,7 +1688,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V String taskName, boolean deserializeBinary, final boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { return getAllAsync(Collections.singletonList(key), forcePrimary, @@ -1517,7 +1698,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, deserializeBinary, skipVals, - canRemap).chain( + canRemap, + needVer).chain( new CX1<IgniteInternalFuture<Map<K, V>>, V>() { @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { Map<K, V> map = e.get(); @@ -1544,6 +1726,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param deserializeBinary Deserialize binary. * @param skipVals Skip values. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Future for the get operation. * @see GridCacheAdapter#getAllAsync(Collection) */ @@ -1555,7 +1738,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V String taskName, boolean deserializeBinary, boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1570,7 +1754,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V forcePrimary, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), skipVals, - canRemap); + canRemap, + needVer); } /** @@ -1584,6 +1769,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param expiry Expiry policy. * @param skipVals Skip values. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Future for the get operation. * @see GridCacheAdapter#getAllAsync(Collection) */ @@ -1596,7 +1782,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean forcePrimary, @Nullable IgniteCacheExpiryPolicy expiry, final boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -1613,7 +1800,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V skipVals, false, canRemap, - false); + needVer); } /** @@ -1708,20 +1895,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.evicts().touch(entry, topVer); } else { - if (needVer) { - assert keepCacheObjects; - - map.put((K1)key, (V1)new T2<>(res.get1(), res.get2())); - } - else { - ctx.addResult(map, - key, - res.get1(), - skipVals, - keepCacheObjects, - deserializeBinary, - true); - } + ctx.addResult(map, + key, + res.get1(), + skipVals, + keepCacheObjects, + deserializeBinary, + true, + needVer ? res.get2() : null); if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); @@ -1783,20 +1964,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Don't put key-value pair into result map if value is null. if (val != null) { - if (needVer) { - assert keepCacheObjects; - - map.put((K1)key, (V1)new T2<>(cacheVal, set ? verSet : ver)); - } - else { - ctx.addResult(map, - key, - cacheVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false); - } + ctx.addResult(map, + key, + cacheVal, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? set ? verSet : ver : null); } if (tx0 == null || (!tx0.implicit() && @@ -1889,11 +2064,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } else { - assert !needVer; - return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) { @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) { - return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough); + return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough, needVer); } }, ctx.operationContextPerCall()); } @@ -4494,28 +4667,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param key Key. * @param deserializeBinary Deserialize binary flag. + * @param needVer Need version. * @return Cached value. * @throws IgniteCheckedException If failed. */ - @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException { + @Nullable public V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException { checkJta(); String taskName = ctx.kernalContext().job().currentTaskName(); - return get(key, taskName, deserializeBinary); + return get(key, taskName, deserializeBinary, needVer); } /** * @param key Key. * @param taskName Task name. * @param deserializeBinary Deserialize binary flag. + * @param needVer Need version. * @return Cached value. * @throws IgniteCheckedException If failed. */ protected V get( final K key, String taskName, - boolean deserializeBinary) throws IgniteCheckedException { + boolean deserializeBinary, + boolean needVer) throws IgniteCheckedException { return getAsync(key, !ctx.config().isReadFromBackup(), /*skip tx*/false, @@ -4523,15 +4699,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, deserializeBinary, false, - /*can remap*/true).get(); + /*can remap*/true, + needVer).get(); } /** * @param key Key. * @param deserializeBinary Deserialize binary flag. + * @param needVer Need version. * @return Read operation future. */ - public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializeBinary) { + public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializeBinary, final boolean needVer) { try { checkJta(); } @@ -4548,28 +4726,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, deserializeBinary, false, - /*can remap*/true); + /*can remap*/true, + needVer); } /** * @param keys Keys. * @param deserializeBinary Deserialize binary flag. + * @param needVer Need version. * @return Map of cached values. * @throws IgniteCheckedException If read failed. */ - public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary) throws IgniteCheckedException { + public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary, + boolean needVer) throws IgniteCheckedException { checkJta(); - return getAllAsync(keys, deserializeBinary).get(); + return getAllAsync(keys, deserializeBinary, needVer).get(); } /** * @param keys Keys. * @param deserializeBinary Deserialize binary flag. + * @param needVer Need version. * @return Read future. */ public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys, - boolean deserializeBinary) { + boolean deserializeBinary, boolean needVer) { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync(keys, @@ -4579,7 +4761,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V taskName, deserializeBinary, /*skip vals*/false, - /*can remap*/true); + /*can remap*/true, + needVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index fc48b9d..e875df0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -49,12 +49,12 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -91,6 +91,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -1882,6 +1883,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @param keepCacheObjects Keep cache objects flag. * @param deserializeBinary Deserialize binary flag. * @param cpy Copy flag. + * @param ver GridCacheVersion. */ @SuppressWarnings("unchecked") public <K1, V1> void addResult(Map<K1, V1> map, @@ -1890,7 +1892,8 @@ public class GridCacheContext<K, V> implements Externalizable { boolean skipVals, boolean keepCacheObjects, boolean deserializeBinary, - boolean cpy) { + boolean cpy, + final GridCacheVersion ver) { assert key != null; assert val != null || skipVals; @@ -1902,10 +1905,32 @@ public class GridCacheContext<K, V> implements Externalizable { assert key0 != null : key; assert val0 != null : val; - map.put((K1)key0, (V1)val0); + map.put((K1)key0, ver != null ? (V1)new T2<>(val0, ver) : (V1)val0); } else - map.put((K1)key, (V1)(skipVals ? true : val)); + map.put((K1)key, + (V1)(ver != null ? + (V1)new T2<>(skipVals ? true : val, ver) : + skipVals ? true : val)); + } + + /** + * @param map Map. + * @param key Key. + * @param val Value. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects flag. + * @param deserializeBinary Deserialize binary flag. + * @param cpy Copy flag. + */ + public <K1, V1> void addResult(Map<K1, V1> map, + KeyCacheObject key, + CacheObject val, + boolean skipVals, + boolean keepCacheObjects, + boolean deserializeBinary, + boolean cpy) { + addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, cpy, null); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2d25d16..64cfd01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -882,7 +882,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateTtl(expiryPlc); if (retVer) { - resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : this.ver; + resVer = (isNear() && cctx.transactional()) ? ((GridNearCacheEntry)this).dhtVersion() : this.ver; if (resVer == null) ret = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 3a53942..9b4aff3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -31,6 +31,7 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; @@ -307,6 +308,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntry(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public V getTopologySafe(K key) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); @@ -331,6 +344,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(K key) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntryAsync(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public V getForcePrimary(K key) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); @@ -451,6 +476,19 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public Collection<CacheEntry<K, V>> getEntries( + @Nullable Collection<? extends K> keys) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntries(keys); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys) { CacheOperationContext prev = gate.enter(opCtx); @@ -463,6 +501,19 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync( + @Nullable Collection<? extends K> keys) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntriesAsync(keys); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Nullable @Override public V getAndPut(K key, V val) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9e66d4d..5ed8753 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -44,6 +44,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.CacheMetrics; @@ -873,6 +874,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public CacheEntry<K, V> getEntry(K key) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) { + setFuture(delegate.getEntryAsync(key)); + + return null; + } + else + return delegate.getEntry(key); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; @@ -898,6 +924,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) { + setFuture(delegate.getEntriesAsync(keys)); + + return null; + } + else + return delegate.getEntries(keys); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 433290c..68d0f06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -335,6 +336,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { @Nullable public V get(K key) throws IgniteCheckedException; /** + * Retrieves value mapped to the specified key from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. The return value of {@code null} + * means entry did not pass the provided filter or cache has no mapping for the + * key. + * <p> + * If the value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disable, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * <code>CacheStore#load(Transaction, Object)</code> method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param key Key to retrieve the value for. + * @return Value for the given key. + * @throws IgniteCheckedException If get operation failed. + * @throws NullPointerException if the key is {@code null}. + */ + @Nullable public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException; + + /** * Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if * its entry passed the optional filter provided. Filter check is atomic, and therefore the * returned value is guaranteed to be consistent with the filter. The return value of {@code null} @@ -356,6 +379,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<V> getAsync(K key); /** + * Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. The return value of {@code null} + * means entry did not pass the provided filter or cache has no mapping for the + * key. + * <p> + * If the value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disabled, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * <code>CacheStore#load(Transaction, Object)</code> method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param key Key for the value to get. + * @return Future for the get operation. + * @throws NullPointerException if the key is {@code null}. + */ + public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(K key); + + /** * Retrieves values mapped to the specified keys from cache. Value will only be returned if * its entry passed the optional filter provided. Filter check is atomic, and therefore the * returned value is guaranteed to be consistent with the filter. If requested key-value pair @@ -377,6 +421,27 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException; /** + * Retrieves values mapped to the specified keys from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. If requested key-value pair + * is not present in the returned map, then it means that its entry did not pass the provided + * filter or cache has no mapping for the key. + * <p> + * If some value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disabled, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * <code>CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure)</code> method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param keys Keys to get. + * @return Map of key-value pairs. + * @throws IgniteCheckedException If get operation failed. + */ + public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys) throws IgniteCheckedException; + + /** * Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if * its entry passed the optional filter provided. Filter check is atomic, and therefore the * returned value is guaranteed to be consistent with the filter. If requested key-value pair @@ -397,6 +462,26 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys); /** + * Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. If requested key-value pair + * is not present in the returned map, then it means that its entry did not pass the provided + * filter or cache has no mapping for the key. + * <p> + * If some value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disabled, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * <code>CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure)</code> method. + * <h2 class="header">Transactions</h2> + * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param keys Key for the value to get. + * @return Future for the get operation. + */ + public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(@Nullable Collection<? extends K> keys); + + /** * Stores given key-value pair in cache. If filters are provided, then entries will * be stored in cache only if they pass the filter. Note that filter check is atomic, * so value stored in cache is guaranteed to be consistent with the filters. If cache http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 7efaf49..28c94dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -153,21 +153,6 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun } /** - * @param map Result map. - * @param key Key. - * @param val Value. - * @param ver Version. - */ - @SuppressWarnings("unchecked") - protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) { - assert needVer; - assert skipVals || val != null; - assert ver != null; - - map.put(key, new T2<>(skipVals ? true : val, ver)); - } - - /** * Affinity node to send get request to. * * @param affNodes All affinity nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 9cf8084..5be4e72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -617,6 +617,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param keys {@inheritDoc} * @param forcePrimary {@inheritDoc} * @param skipTx {@inheritDoc} + * @param needVer Need version. * @return {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @@ -627,7 +628,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap String taskName, boolean deserializeBinary, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -640,7 +642,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap forcePrimary, null, skipVals, - canRemap); + canRemap, + needVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index cb8c842..c926c13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -383,7 +383,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /*deserialize binary*/false, skipVals, /*keep cache objects*/true, - /*skip store*/!readThrough); + /*skip store*/!readThrough, + false); } } else { @@ -413,7 +414,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /*deserialize binary*/false, skipVals, /*keep cache objects*/true, - /*skip store*/!readThrough); + /*skip store*/!readThrough, + false); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d8b2f37..41b28d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -946,7 +946,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters()) || - e.serializableReadVersion() != null) { + e.entryReadVersion() != null) { if (map == null) map = new HashMap<>(); @@ -1013,7 +1013,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter throws IgniteCheckedException { try { for (IgniteTxEntry entry : entries) { - GridCacheVersion serReadVer = entry.serializableReadVersion(); + GridCacheVersion serReadVer = entry.entryReadVersion(); if (serReadVer != null) { entry.cached().unswap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 1f2d7c5..2c9a760 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -490,17 +490,14 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda cache.removeIfObsolete(key); } else { - if (needVer) - versionedResult(locVals, key, v, ver); - else { - cctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObjects, - deserializeBinary, - true); - } + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + ver); return true; } @@ -552,17 +549,14 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda for (GridCacheEntryInfo info : infos) { assert skipVals == (info.value() == null); - if (needVer) - versionedResult(map, info.key(), info.value(), info.version()); - else { - cctx.addResult(map, - info.key(), - info.value(), - skipVals, - keepCacheObjects, - deserializeBinary, - false); - } + cctx.addResult(map, + info.key(), + info.value(), + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? info.version() : null); } return map; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 0c811ae..01e61bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -625,20 +625,13 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im assert !skipVals; if (val != null) { - if (needVer) { - assert ver != null; + if (!keepCacheObjects) { + Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); - onDone(new T2<>(val, ver)); - } - else { - if (!keepCacheObjects) { - Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); - - onDone(res); - } - else - onDone(val); + onDone(needVer ? new T2<>(res, ver) : res); } + else + onDone(needVer ? new T2<>(val, ver) : val); } else onDone(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cba4e61..b806906 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -317,7 +318,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException { + @Override protected V get(K key, String taskName, boolean deserializeBinary, boolean needVer) + throws IgniteCheckedException { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -339,7 +341,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiryPlc, false, skipStore, - true).get(); + true, + needVer).get(); } /** {@inheritDoc} */ @@ -350,7 +353,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final String taskName, final boolean deserializeBinary, final boolean skipVals, - final boolean canRemap) { + final boolean canRemap, + final boolean needVer) { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -376,7 +380,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiryPlc, skipVals, skipStore, - canRemap); + canRemap, + needVer); } }); } @@ -390,7 +395,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final String taskName, final boolean deserializeBinary, final boolean skipVals, - final boolean canRemap + final boolean canRemap, + final boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -420,7 +426,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiryPlc, skipVals, skipStore, - canRemap); + canRemap, + needVer); } }); } @@ -1098,6 +1105,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param skipVals Skip values flag. * @param skipStore Skip store flag. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Get future. */ private IgniteInternalFuture<V> getAsync0(KeyCacheObject key, @@ -1108,7 +1116,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable ExpiryPolicy expiryPlc, boolean skipVals, boolean skipStore, - boolean canRemap + boolean canRemap, + boolean needVer ) { AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion(); @@ -1126,7 +1135,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiry, skipVals, canRemap, - false, + needVer, false); fut.init(); @@ -1145,6 +1154,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param skipStore Skip store flag. + * @param needVer Need version. * @return Get future. */ private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys, @@ -1155,7 +1165,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable ExpiryPolicy expiryPlc, boolean skipVals, boolean skipStore, - boolean canRemap + boolean canRemap, + boolean needVer ) { AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion(); @@ -1180,19 +1191,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry != null) { boolean isNew = entry.isNewLocked(); - CacheObject v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiry, - !deserializeBinary); + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiry, + true); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + } // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -1204,7 +1238,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { success = false; } else - ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true); + ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, ver); } else success = false; @@ -1256,7 +1290,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiry, skipVals, canRemap, - false, + needVer, false); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 073043d..dc4b6bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -200,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte String taskName, final boolean deserializeBinary, final boolean skipVals, - boolean canRemap) { + boolean canRemap, + final boolean needVer) { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -218,7 +219,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte deserializeBinary, skipVals, false, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + needVer); return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() { @SuppressWarnings("unchecked") @@ -258,7 +260,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), skipVals, canRemap, - /*needVer*/false, + needVer, /*keepCacheObjects*/false); fut.init(); @@ -275,7 +277,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte String taskName, final boolean deserializeBinary, final boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -297,7 +300,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte deserializeBinary, skipVals, false, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + needVer); } }, opCtx); } @@ -318,7 +322,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte deserializeBinary, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), skipVals, - canRemap); + canRemap, + needVer); } /** {@inheritDoc} */ @@ -345,6 +350,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Loaded values. */ public IgniteInternalFuture<Map<K, V>> loadAsync( @@ -357,7 +363,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean deserializeBinary, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap) { + boolean canRemap, + boolean needVer) { return loadAsync(keys, readThrough, forcePrimary, @@ -367,7 +374,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc, skipVals, canRemap, - false, + needVer, false); } @@ -522,17 +529,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (locVals == null) locVals = U.newHashMap(keys.size()); - if (needVer) - locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver)); - else { - ctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObj, - deserializeBinary, - true); - } + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObj, + deserializeBinary, + true, + ver); } } else http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a2d5adb..63c073d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -400,7 +400,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName, boolean deserializeBinary, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -423,7 +424,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, opCtx != null && opCtx.skipStore(), - canRemap); + canRemap, + needVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 5bf18d9..c750be6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -230,6 +230,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param skipVal Skip value flag. * @param skipStore Skip store flag. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Loaded values. */ public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable IgniteInternalTx tx, @@ -241,7 +242,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Nullable ExpiryPolicy expiryPlc, boolean skipVal, boolean skipStore, - boolean canRemap + boolean canRemap, + boolean needVer ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -261,7 +263,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda expiry, skipVal, canRemap, - false, + needVer, false); // init() will register future for responses if future has remote mappings. http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index c0a1617..026fb4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -350,7 +350,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { null, false, /*skip store*/false, - /*can remap*/true + /*can remap*/true, + false ).get().get(keyValue(false)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 9291001..06fc0a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -650,26 +650,25 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap */ @SuppressWarnings("unchecked") private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) { - if (needVer) { - V val0 = (V)new T2<>(skipVals ? true : v, ver); + if (keepCacheObjects) { + K key0 = (K)key; + V val0 = needVer ? + (V)new T2<>(skipVals ? true : v, ver) : + (V)(skipVals ? true : v); - add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0))); + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - if (keepCacheObjects) { - K key0 = (K)key; - V val0 = (V)(skipVals ? true : v); - - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); - } - else { - K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); - V val0 = !skipVals ? + K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); + V val0 = needVer ? + (V)new T2<>(!skipVals ? + (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : + (V)Boolean.TRUE, ver) : + !skipVals ? (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : (V)Boolean.TRUE; - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); - } + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } } @@ -741,16 +740,14 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap assert skipVals == (info.value() == null); - if (needVer) - versionedResult(map, key, val, info.version()); - else - cctx.addResult(map, - key, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false); + cctx.addResult(map, + key, + val, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? info.version() : null); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 4f9f227..52ebfc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -107,7 +107,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (txEntry != null) { if (entry.context().isLocal()) { - GridCacheVersion serReadVer = txEntry.serializableReadVersion(); + GridCacheVersion serReadVer = txEntry.entryReadVersion(); if (serReadVer != null) { GridCacheContext ctx = entry.context(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index bae0327..b968e57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -279,6 +279,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ private void prepareSingle(IgniteTxEntry write, boolean topLocked) { + write.clearEntryReadVersion(); + AffinityTopologyVersion topVer = tx.topologyVersion(); assert topVer.topologyVersion() > 0; @@ -339,6 +341,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); for (IgniteTxEntry write : writes) { + write.clearEntryReadVersion(); + GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); if (cur != updated) {