http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 8170008..615a92b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -176,6 +176,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA txMapping = new GridDhtTxMapping(); for (IgniteTxEntry txEntry : tx.allEntries()) { + txEntry.clearEntryReadVersion(); + GridCacheContext cacheCtx = txEntry.context(); List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index a09dec0..a3130cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -122,7 +122,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> String taskName, final boolean deserializeBinary, final boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -146,7 +147,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> deserializeBinary, skipVals, false, - skipStore); + skipStore, + needVer); } }, opCtx); } @@ -162,7 +164,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, skipStore, - canRemap); + canRemap, + needVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 6130ead..fed3e33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -477,7 +477,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException { + @Override @Nullable public V get(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); Map<K, V> m = getAllInternal(Collections.singleton(key), @@ -485,7 +485,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.readThrough(), taskName, deserializeBinary, - false); + false, + needVer); assert m.isEmpty() || m.size() == 1 : m.size(); @@ -494,7 +495,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary) + @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { A.notNull(keys, "keys"); @@ -505,7 +506,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.readThrough(), taskName, deserializeBinary, - false); + false, + needVer); } @@ -519,7 +521,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { final String taskName, final boolean deserializeBinary, final boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { A.notNull(keys, "keys"); @@ -528,7 +531,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return asyncOp(new Callable<Map<K, V>>() { @Override public Map<K, V> call() throws Exception { - return getAllInternal(keys, swapOrOffheap, storeEnabled, taskName, deserializeBinary, skipVals); + return getAllInternal(keys, swapOrOffheap, storeEnabled, taskName, deserializeBinary, skipVals, needVer); } }); } @@ -542,6 +545,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { * @param taskName Task name. * @param deserializeBinary Deserialize binary . * @param skipVals Skip value flag. + * @param needVer Need version. * @return Key-value map. * @throws IgniteCheckedException If failed. */ @@ -551,7 +555,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { boolean storeEnabled, String taskName, boolean deserializeBinary, - boolean skipVals + boolean skipVals, + boolean needVer ) throws IgniteCheckedException { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -584,24 +589,65 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey); if (entry != null) { - CacheObject v = entry.innerGet(null, - /*swap*/swapOrOffheap, - /*read-through*/false, - /*fail-fast*/false, - /*unmarshal*/true, - /**update-metrics*/true, - /**event*/!skipVals, - /**temporary*/false, - subjId, - null, - taskName, - expiry, - !deserializeBinary); + CacheObject v ; + GridCacheVersion ver; - if (v != null) - ctx.addResult(vals, cacheKey, v, skipVals, false, deserializeBinary, true); - else - success = false; + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/swapOrOffheap, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + + ctx.addResult( + vals, + cacheKey, + v, + skipVals, + false, + deserializeBinary, + true, + ver); + }else + success = false; + } + else { + v = entry.innerGet(null, + /*swap*/swapOrOffheap, + /*read-through*/false, + /*fail-fast*/false, + /*unmarshal*/true, + /**update-metrics*/true, + /**event*/!skipVals, + /**temporary*/false, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + + if (v != null) { + ctx.addResult(vals, + cacheKey, + v, + skipVals, + false, + deserializeBinary, + true); + } + else + success = false; + } } else { if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) @@ -638,7 +684,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /*force primary*/false, expiry, skipVals, - /*can remap*/true).get(); + /*can remap*/true, + needVer).get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index f731975..8b871a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; @@ -73,6 +74,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Dummy version for any existing entry read in SERIALIZABLE transaction. */ public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1); + /** */ + public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2); + + /** */ + public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3); + /** Prepared flag updater. */ private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD = AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); @@ -918,13 +925,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** - * @param serReadVer Read version for serializable transaction. + * Gets stored entry version. Version is stored for all entries in serializable transaction or + * when value is read using {@link IgniteCache#getEntry(Object)} method. + * + * @return Entry version. + */ + @Nullable public GridCacheVersion entryReadVersion() { + return serReadVer; + } + + /** + * @param ver Entry version. */ - public void serializableReadVersion(GridCacheVersion serReadVer) { + public void entryReadVersion(GridCacheVersion ver) { assert this.serReadVer == null; - assert serReadVer != null; + assert ver != null; - this.serReadVer = serReadVer; + this.serReadVer = ver; + } + + /** + * Clears recorded read version, should be done before starting commit of not serializable/optimistic transaction. + */ + public void clearEntryReadVersion() { + serReadVer = null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index aad9841..a999358 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -1342,7 +1343,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** * Checks if there is a cached or swapped value for - * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. + * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method. * * @param cacheCtx Cache context. * @param keys Key to enlist. @@ -1368,7 +1369,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig boolean deserializeBinary, boolean skipVals, boolean keepCacheObjects, - boolean skipStore + boolean skipStore, + final boolean needVer ) throws IgniteCheckedException { assert !F.isEmpty(keys); assert keysCnt == keys.size(); @@ -1381,7 +1383,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig AffinityTopologyVersion topVer = topologyVersion(); - boolean needReadVer = serializable() && optimistic(); + boolean needReadVer = (serializable() && optimistic()) || needVer; // In this loop we cover only read-committed or optimistic transactions. // Transactions that are pessimistic and not read-committed are covered @@ -1403,31 +1405,89 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (!F.isEmpty(txEntry.entryProcessors())) val = txEntry.applyEntryProcessors(val); - if (val != null) - cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false); + if (val != null) { + GridCacheVersion ver = null; + + if (needVer) { + if (txEntry.op() != READ) + ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED; + else { + ver = txEntry.entryReadVersion(); + + if (ver == null && pessimistic()) { + while (true) { + try { + GridCacheEntryEx cached = txEntry.cached(); + + ver = cached.isNear() ? + ((GridNearCacheEntry)cached).dhtVersion() : cached.version(); + + break; + } + catch (GridCacheEntryRemovedException rmvdErr) { + txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); + } + } + } + + if (ver == null) { + assert optimistic() && repeatableRead() : this; + + ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET; + } + } + + assert ver != null; + } + + cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, ver); + } } else { assert txEntry.op() == TRANSFORM; while (true) { try { + GridCacheVersion readVer = null; + Object transformClo = - (txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? + (txEntry.op() == TRANSFORM && + cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? F.first(txEntry.entryProcessors()) : null; - val = txEntry.cached().innerGet(this, - /*swap*/true, - /*read-through*/false, - /*fail fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/!skipVals, - /*temporary*/false, - CU.subjectId(this, cctx), - transformClo, - resolveTaskName(), - null, - txEntry.keepBinary()); + if (needVer) { + T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned( + this, + /*swap*/true, + /*unmarshal*/true, + /*update-metrics*/true, + /*event*/!skipVals, + CU.subjectId(this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary()); + + if (res != null) { + val = res.get1(); + readVer = res.get2(); + } + } + else { + val = txEntry.cached().innerGet(this, + /*swap*/true, + /*read-through*/false, + /*fail fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/!skipVals, + /*temporary*/false, + CU.subjectId(this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary()); + } if (val != null) { if (!readCommitted() && !skipVals) @@ -1442,7 +1502,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, deserializeBinary, - false); + false, + readVer); } else missed.put(key, txEntry.cached().version()); @@ -1517,7 +1578,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, deserializeBinary, - false); + false, + needVer ? readVer : null); } else missed.put(key, ver); @@ -1549,7 +1611,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (needReadVer) { assert readVer != null; - txEntry.serializableReadVersion(readVer); + txEntry.entryReadVersion(readVer); } } } @@ -1600,7 +1662,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** * Loads all missed keys for - * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. + * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method. * * @param cacheCtx Cache context. * @param map Return map. @@ -1618,12 +1680,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final boolean deserializeBinary, final boolean skipVals, final boolean keepCacheObjects, - final boolean skipStore + final boolean skipStore, + final boolean needVer + ) { if (log.isDebugEnabled()) log.debug("Loading missed values for missed map: " + missedMap); - final boolean needReadVer = serializable() && optimistic(); + final boolean needReadVer = (serializable() && optimistic()) || needVer; return new GridEmbeddedFuture<>( new C2<Void, Exception, Map<K, V>>() { @@ -1685,7 +1749,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, deserializeBinary, - false); + false, + needVer ? loadVer : null); } } else { @@ -1696,7 +1761,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (needReadVer) { assert loadVer != null; - txEntry.serializableReadVersion(loadVer); + txEntry.entryReadVersion(loadVer); } if (visibleVal != null) { @@ -1706,7 +1771,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, deserializeBinary, - false); + false, + needVer ? loadVer : null); } } } @@ -1715,13 +1781,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync( final GridCacheContext cacheCtx, Collection<KeyCacheObject> keys, final boolean deserializeBinary, final boolean skipVals, final boolean keepCacheObjects, - final boolean skipStore) { + final boolean skipStore, + final boolean needVer) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -1751,7 +1819,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig deserializeBinary, skipVals, keepCacheObjects, - skipStore); + skipStore, + needVer); if (single && missed.isEmpty()) return new GridFinishedFuture<>(retMap); @@ -1797,25 +1866,48 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig while (true) { GridCacheEntryEx cached = txEntry.cached(); + CacheObject val = null; + GridCacheVersion readVer = null; + try { Object transformClo = (!F.isEmpty(txEntry.entryProcessors()) && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? F.first(txEntry.entryProcessors()) : null; - CacheObject val = cached.innerGet(IgniteTxLocalAdapter.this, - cacheCtx.isSwapOrOffheapEnabled(), - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*events*/!skipVals, - /*temporary*/true, - CU.subjectId(IgniteTxLocalAdapter.this, cctx), - transformClo, - resolveTaskName(), - null, - txEntry.keepBinary()); + if (needVer) { + T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned( + IgniteTxLocalAdapter.this, + /*swap*/cacheCtx.isSwapOrOffheapEnabled(), + /*unmarshal*/true, + /**update-metrics*/true, + /*event*/!skipVals, + CU.subjectId(IgniteTxLocalAdapter.this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary()); + + if (res != null) { + val = res.get1(); + readVer = res.get2(); + } + } + else{ + val = cached.innerGet(IgniteTxLocalAdapter.this, + cacheCtx.isSwapOrOffheapEnabled(), + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*events*/!skipVals, + /*temporary*/true, + CU.subjectId(IgniteTxLocalAdapter.this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary()); + } // If value is in cache and passed the filter. if (val != null) { @@ -1832,7 +1924,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig skipVals, keepCacheObjects, deserializeBinary, - false); + false, + readVer); + + if (readVer != null) + txEntry.entryReadVersion(readVer); } // Even though we bring the value back from lock acquisition, @@ -1858,7 +1954,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig deserializeBinary, skipVals, keepCacheObjects, - skipStore); + skipStore, + needVer); } return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -1924,7 +2021,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig deserializeBinary, skipVals, keepCacheObjects, - skipStore); + skipStore, + needVer); } return new GridFinishedFuture<>(retMap); @@ -2315,7 +2413,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (needReadVer) { assert loadVer != null; - e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); } if (singleRmv) { @@ -2508,7 +2606,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (needReadVer) { assert readVer != null; - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); } } @@ -2561,7 +2659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (needReadVer) { assert readVer != null; - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); } if (retval && !transform) http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index a5d3373..78f517c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -76,7 +76,8 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { boolean deserializeBinary, boolean skipVals, boolean keepCacheObjects, - boolean skipStore); + boolean skipStore, + boolean needVer); /** * @param cacheCtx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 3e43726..28a7cc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1419,7 +1419,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert !entry1.detached() : "Expected non-detached entry for near transaction " + "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']'; - GridCacheVersion serReadVer = txEntry1.serializableReadVersion(); + GridCacheVersion serReadVer = txEntry1.entryReadVersion(); assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java new file mode 100644 index 0000000..c0ba42c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Test getEntry and getEntries methods. + */ +public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTest { + /** */ + private static final String UPDATED_ENTRY_ERR = "Impossible to get version for entry updated in transaction"; + + /** */ + private static final String ENTRY_AFTER_GET_ERR = "Impossible to get entry version after get()"; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * @return Transaction concurrency. + */ + abstract protected TransactionConcurrency concurrency(); + + /** + * + * @return Transaction isolation. + */ + abstract protected TransactionIsolation isolation(); + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60_000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(null); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNear() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(ATOMIC); + cfg.setName("near"); + cfg.setNearConfiguration(new NearCacheConfiguration()); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testNearTransactional() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setName("nearT"); + cfg.setNearConfiguration(new NearCacheConfiguration()); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(ATOMIC); + cfg.setName("partitioned"); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedTransactional() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setName("partitionedT"); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(LOCAL); + cfg.setAtomicityMode(ATOMIC); + cfg.setName("local"); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testLocalTransactional() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(LOCAL); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setName("localT"); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(REPLICATED); + cfg.setAtomicityMode(ATOMIC); + cfg.setName("replicated"); + + test(cfg); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTransactional() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setCacheMode(REPLICATED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setName("replicatedT"); + + test(cfg); + } + + /** + * @param cfg Cache configuration. + * @throws Exception If failed. + */ + private void test(CacheConfiguration cfg) throws Exception { + test(cfg, true); + + test(cfg, false); + } + + /** + * @param cfg Cache configuration. + * @param oneEntry If {@code true} then single entry is tested. + * @throws Exception If failed. + */ + private void test(CacheConfiguration cfg, final boolean oneEntry) throws Exception { + final IgniteCache<Integer, TestValue> cache = grid(0).createCache(cfg); + + try { + init(cache); + + test(cache, null, null, null, oneEntry); + + if (cfg.getAtomicityMode() == TRANSACTIONAL) { + TransactionConcurrency txConcurrency = concurrency(); + TransactionIsolation txIsolation = isolation(); + + try (Transaction tx = grid(0).transactions().txStart(txConcurrency, txIsolation)) { + initTx(cache); + + test(cache, txConcurrency, txIsolation, tx, oneEntry); + + tx.commit(); + } + + testConcurrentTx(cache, OPTIMISTIC, REPEATABLE_READ, oneEntry); + testConcurrentTx(cache, OPTIMISTIC, READ_COMMITTED, oneEntry); + + testConcurrentTx(cache, PESSIMISTIC, REPEATABLE_READ, oneEntry); + testConcurrentTx(cache, PESSIMISTIC, READ_COMMITTED, oneEntry); + } + } + finally { + cache.destroy(); + } + } + + /** + * @param cache Cache. + * @param txConcurrency Transaction concurrency. + * @param txIsolation Transaction isolation. + * @param oneEntry If {@code true} then single entry is tested. + * @throws Exception If failed. + */ + private void testConcurrentTx(final IgniteCache<Integer, TestValue> cache, + final TransactionConcurrency txConcurrency, + final TransactionIsolation txIsolation, + final boolean oneEntry) throws Exception { + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = grid(0).transactions(); + + long stopTime = System.currentTimeMillis() + 3000; + + while (System.currentTimeMillis() < stopTime) { + Set<Integer> keys = new LinkedHashSet<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { + if (oneEntry) { + for (int i = 0; i < 100; i++) + cache.getEntry(i); + } + else + cache.getEntries(keys); + + for (int i = 0; i < 100; i++) + cache.put(i, new TestValue(i)); + + tx.commit(); + } + } + + return null; + } + }, 10, "tx-thread"); + } + + /** + * @param base Start value. + * @return Keys. + */ + private Set<Integer> getKeys(int base) { + int start = 0; + int finish = 100; + + Set<Integer> keys = new HashSet<>(finish - start); + + for (int i = base + start; i < base + finish; ++i) + keys.add(i); + + return keys; + } + + /** + * @return Keys. + */ + private Set<Integer> createdBeforeTxKeys() { + return getKeys(0); + } + + /** + * @return Keys. + */ + private Set<Integer> createdBeforeTxWithBinaryKeys() { + return getKeys(1_000); + } + + /** + * @return Keys. + */ + private Set<Integer> createdBeforeTxKeys2() { + return getKeys(2_000); + } + + /** + * @return Keys. + */ + private Set<Integer> createdBeforeTxWithBinaryKeys2() { + return getKeys(3_000); + } + + /** + * @return Keys. + */ + private Set<Integer> createdBeforeTxKeys3() { + return getKeys(4_000); + } + + /** + * @return Keys. + */ + private Set<Integer> createdBeforeTxWithBinaryKeys3() { + return getKeys(5_000); + } + + /** + * @return Keys. + */ + private Set<Integer> removedBeforeTxKeys() { + return getKeys(6_000); + } + + /** + * @return Keys. + */ + private Set<Integer> removedBeforeTxWithBinaryKeys() { + return getKeys(7_000); + } + + /** + * @return Keys. + */ + private Set<Integer> createdAtTxKeys() { + return getKeys(8_000); + } + + /** + * @return Keys. + */ + private Set<Integer> createdAtTxWithBinaryKeys() { + return getKeys(9_000); + } + + /** + * @return Keys. + */ + private Set<Integer> removedAtTxKeys() { + return getKeys(10_000); + } + + /** + * @return Keys. + */ + private Set<Integer> removedAtTxWithBinaryKeys() { + return getKeys(11_000); + } + + /** + * @param cache Cacge. + */ + private void init(IgniteCache<Integer, TestValue> cache) { + Set<Integer> keys = new HashSet<>(); + + keys.addAll(createdBeforeTxKeys()); + keys.addAll(createdBeforeTxWithBinaryKeys()); + keys.addAll(createdBeforeTxKeys2()); + keys.addAll(createdBeforeTxWithBinaryKeys2()); + keys.addAll(createdBeforeTxKeys3()); + keys.addAll(createdBeforeTxWithBinaryKeys3()); + keys.addAll(removedBeforeTxKeys()); + keys.addAll(removedBeforeTxWithBinaryKeys()); + keys.addAll(removedAtTxKeys()); + keys.addAll(removedAtTxWithBinaryKeys()); + + for (int i : keys) + cache.put(i, new TestValue(i)); + + for (int i : removedBeforeTxKeys()) + cache.remove(i); + + for (int i : removedBeforeTxWithBinaryKeys()) + cache.remove(i); + } + + /** + * @param cache Cache. + */ + private void initTx(IgniteCache<Integer, TestValue> cache) { + for (int i : createdAtTxKeys()) + cache.put(i, new TestValue(i)); + + for (int i : createdAtTxWithBinaryKeys()) + cache.put(i, new TestValue(i)); + + for (int i : removedAtTxKeys()) + cache.remove(i); + + for (int i : removedAtTxWithBinaryKeys()) + cache.remove(i); + } + + /** + * @param e Entry. + * @param cache Cache. + * @throws Exception If failed. + */ + private void compareVersionWithPrimaryNode(CacheEntry<Integer, ?> e, IgniteCache<Integer, TestValue> cache) + throws Exception { + CacheConfiguration cfg = cache.getConfiguration(CacheConfiguration.class); + + if (cfg.getCacheMode() != LOCAL) { + Ignite prim = primaryNode(e.getKey(), cache.getName()); + + GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)prim).internalCache(cache.getName()); + + if (cfg.getNearConfiguration() != null) + cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht(); + + IgniteCacheObjectProcessor cacheObjects = cacheAdapter.context().cacheObjects(); + + CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext(); + + GridCacheMapEntry mapEntry = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject( + cacheObjCtx, e.getKey(), true)); + + assertNotNull("No entry for key: " + e.getKey(), mapEntry); + assertEquals(mapEntry.version(), e.version()); + } + } + + /** + * @param cache Cache. + * @param i Key. + * @param oneEntry If {@code true} then single entry is tested. + * @param getVerErr Not null error if entry version access should fail. + * @param expKeys Expected keys with values. + * @throws Exception If failed. + */ + private void checkData(IgniteCache<Integer, TestValue> cache, + int i, + boolean oneEntry, + @Nullable String getVerErr, + Set<Integer> expKeys) throws Exception { + if (oneEntry) { + final CacheEntry<Integer, TestValue> e = cache.getEntry(i); + + if (getVerErr == null) + compareVersionWithPrimaryNode(e, cache); + else { + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + e.version(); + + return null; + } + }, IgniteException.class, null); + + assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr)); + } + + assertEquals(e.getValue().val, i); + } + else { + Set<Integer> set = new HashSet<>(); + + int expCnt = 0; + + for (int j = 0; j < 10; j++) { + Integer key = i + j; + + set.add(key); + + if (expKeys.contains(key)) + expCnt++; + } + + Collection<CacheEntry<Integer, TestValue>> entries = cache.getEntries(set); + + assertEquals(expCnt, entries.size()); + + for (final CacheEntry<Integer, TestValue> e : entries) { + if (getVerErr == null) + compareVersionWithPrimaryNode(e, cache); + else { + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + e.version(); + + return null; + } + }, IgniteException.class, null); + + assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr)); + } + + assertEquals((Integer)e.getValue().val, e.getKey()); + + assertTrue(set.contains(e.getValue().val)); + } + } + } + + /** + * @param cache Cache. + * @param i Key. + * @param oneEntry If {@code true} then single entry is tested. + * @param getVerErr Not null error if entry version access should fail. + * @param expKeys Expected keys with values. + * @throws Exception If failed. + */ + private void checkBinaryData(IgniteCache<Integer, TestValue> cache, + int i, + boolean oneEntry, + @Nullable String getVerErr, + Set<Integer> expKeys) throws Exception { + IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary(); + + if (oneEntry) { + final CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i); + + if (getVerErr == null) + compareVersionWithPrimaryNode(e, cache); + else { + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + e.version(); + + return null; + } + }, IgniteException.class, null); + + assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr)); + } + + assertEquals(((TestValue)e.getValue().deserialize()).val, i); + } + else { + Set<Integer> set = new HashSet<>(); + + int expCnt = 0; + + for (int j = 0; j < 10; j++) { + Integer key = i + j; + + set.add(key); + + if (expKeys.contains(key)) + expCnt++; + } + + Collection<CacheEntry<Integer, BinaryObject>> entries = cacheB.getEntries(set); + + assertEquals(expCnt, entries.size()); + + for (final CacheEntry<Integer, BinaryObject> e : entries) { + if (getVerErr == null) + compareVersionWithPrimaryNode(e, cache); + else { + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + e.version(); + + return null; + } + }, IgniteException.class, null); + + assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr)); + } + + TestValue tv = e.getValue().deserialize(); + + assertEquals((Integer)tv.val, e.getKey()); + + assertTrue(set.contains((tv).val)); + } + } + } + + /** + * @param cache Cache. + * @param i Key. + * @param oneEntry If {@code true} then single entry is tested. + */ + private void checkRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) { + if (oneEntry) { + CacheEntry<Integer, TestValue> e = cache.getEntry(i); + + assertNull(e); + } + else { + Set<Integer> set = new HashSet<>(); + + for (int j = 0; j < 10; j++) + set.add(i + j); + + Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set); + + assertTrue(es.isEmpty()); + } + } + + /** + * @param cache Cache. + * @param i Key. + * @param oneEntry If {@code true} then single entry is tested. + */ + private void checkBinaryRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) { + IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary(); + + if (oneEntry) { + CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i); + + assertNull(e); + } + else { + Set<Integer> set = new HashSet<>(); + + for (int j = 0; j < 10; j++) + set.add(i + j); + + Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set); + + assertTrue(es.isEmpty()); + } + } + + /** + * @param cache Cache. + * @param txConcurrency Transaction concurrency. + * @param txIsolation Transaction isolation. + * @param tx Transaction. + * @param oneEntry If {@code true} then single entry is tested. + * @throws Exception If failed. + */ + private void test(IgniteCache<Integer, TestValue> cache, + TransactionConcurrency txConcurrency, + TransactionIsolation txIsolation, + Transaction tx, + boolean oneEntry) throws Exception { + if (tx == null) { + Set<Integer> keys = createdBeforeTxKeys(); + + for (int i : keys) + checkData(cache, i, oneEntry, null, keys); + + keys = createdBeforeTxWithBinaryKeys(); + + for (int i : keys) + checkBinaryData(cache, i, oneEntry, null, keys); + + for (int i : removedBeforeTxKeys()) + checkRemoved(cache, i, oneEntry); + + for (int i : removedBeforeTxWithBinaryKeys()) + checkBinaryRemoved(cache, i, oneEntry); + } + else { + Set<Integer> keys = createdBeforeTxKeys2(); + + for (int i : keys) { + checkData(cache, i, oneEntry, null, keys); + checkData(cache, i, oneEntry, null, keys); + } + + keys = createdBeforeTxWithBinaryKeys2(); + + for (int i : keys) { + checkBinaryData(cache, i, oneEntry, null, keys); + checkBinaryData(cache, i, oneEntry, null, keys); + } + + String verGetErr = null; + + if (txConcurrency == OPTIMISTIC && txIsolation == REPEATABLE_READ) + verGetErr = ENTRY_AFTER_GET_ERR; + + keys = createdBeforeTxKeys3(); + + for (int i : keys) { + if (oneEntry) + cache.get(i); + else { + Set<Integer> set = new HashSet<>(); + + for (int j = 0; j < 10; j++) + set.add(i + j); + + cache.getAll(set); + } + + checkData(cache, i, oneEntry, verGetErr, keys); + } + + keys = createdBeforeTxWithBinaryKeys3(); + + for (int i : keys) { + if (oneEntry) + cache.get(i); + else { + Set<Integer> set = new HashSet<>(); + + for (int j = 0; j < 10; j++) + set.add(i + j); + + cache.getAll(set); + } + + checkBinaryData(cache, i, oneEntry, verGetErr, keys); + } + + keys = createdAtTxKeys(); + + for (int i : keys) + checkData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys); + + keys = createdAtTxWithBinaryKeys(); + + for (int i : keys) + checkBinaryData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys); + + for (int i : removedBeforeTxKeys()) + checkRemoved(cache, i, oneEntry); + + for (int i : removedBeforeTxWithBinaryKeys()) + checkBinaryRemoved(cache, i, oneEntry); + + for (int i : removedAtTxKeys()) + checkRemoved(cache, i, oneEntry); + + for (int i : removedAtTxWithBinaryKeys()) + checkBinaryRemoved(cache, i, oneEntry); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private int val; + + /** + * @param val Value. + */ + public TestValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java new file mode 100644 index 0000000..acc21df --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Test getEntry and getEntries methods. + */ +public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected TransactionConcurrency concurrency() { + return TransactionConcurrency.OPTIMISTIC; + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation isolation() { + return TransactionIsolation.READ_COMMITTED; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java new file mode 100644 index 0000000..6153869 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Test getEntry and getEntries methods. + */ +public class CacheGetEntryOptimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected TransactionConcurrency concurrency() { + return TransactionConcurrency.OPTIMISTIC; + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation isolation() { + return TransactionIsolation.REPEATABLE_READ; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java new file mode 100644 index 0000000..6ded4a9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Test getEntry and getEntries methods. + */ +public class CacheGetEntryOptimisticSerializableSeltTest extends CacheGetEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected TransactionConcurrency concurrency() { + return TransactionConcurrency.OPTIMISTIC; + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation isolation() { + return TransactionIsolation.SERIALIZABLE; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java new file mode 100644 index 0000000..975d271 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Test getEntry and getEntries methods. + */ +public class CacheGetEntryPessimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected TransactionConcurrency concurrency() { + return TransactionConcurrency.PESSIMISTIC; + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation isolation() { + return TransactionIsolation.READ_COMMITTED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java new file mode 100644 index 0000000..dac64d9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Test getEntry and getEntries methods. + */ +public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected TransactionConcurrency concurrency() { + return TransactionConcurrency.PESSIMISTIC; + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation isolation() { + return TransactionIsolation.REPEATABLE_READ; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java new file mode 100644 index 0000000..70f71ce --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Test getEntry and getEntries methods. + */ +public class CacheGetEntryPessimisticSerializableSeltTest extends CacheGetEntryAbstractTest { + /** {@inheritDoc} */ + @Override protected TransactionConcurrency concurrency() { + return TransactionConcurrency.PESSIMISTIC; + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation isolation() { + return TransactionIsolation.SERIALIZABLE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java index c606a2a..b60ada7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java @@ -85,6 +85,20 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest { * @throws Exception If failed. */ public void testReadThroughInTx() throws Exception { + testReadThroughInTx(false); + } + + /** + * @throws Exception If failed. + */ + public void testReadEntryThroughInTx() throws Exception { + testReadThroughInTx(true); + } + + /** + * @throws Exception If failed. + */ + private void testReadThroughInTx(boolean needVer) throws Exception { IgniteCache<String, Integer> cache = grid(1).cache(null); for (int k = 0; k < 1000; k++) @@ -104,7 +118,14 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest { for (int k = 0; k < 1000; k++) { String key = "key" + k; - assertNotNull("Null value for key: " + key, cache.get(key)); + if (needVer) { + assertNotNull("Null value for key: " + key, cache.getEntry(key)); + assertNotNull("Null value for key: " + key, cache.getEntry(key)); + } + else { + assertNotNull("Null value for key: " + key, cache.get(key)); + assertNotNull("Null value for key: " + key, cache.get(key)); + } } tx.commit(); @@ -117,6 +138,20 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest { * @throws Exception If failed. */ public void testReadThrough() throws Exception { + testReadThrough(false); + } + + /** + * @throws Exception If failed. + */ + public void testReadEntryThrough() throws Exception { + testReadThrough(true); + } + + /** + * @throws Exception If failed. + */ + private void testReadThrough(boolean needVer) throws Exception { IgniteCache<String, Integer> cache = grid(1).cache(null); for (int k = 0; k < 1000; k++) @@ -132,8 +167,10 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest { for (int k = 0; k < 1000; k++) { String key = "key" + k; - - assertNotNull("Null value for key: " + key, cache.get(key)); + if (needVer) + assertNotNull("Null value for key: " + key, cache.getEntry(key)); + else + assertNotNull("Null value for key: " + key, cache.get(key)); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 9906ad3..4baef66 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -46,6 +47,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.store.CacheStore; @@ -575,6 +577,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTxCommitReadOnlyGetAll() throws Exception { + testTxCommitReadOnlyGetAll(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnlyGetEntries() throws Exception { + testTxCommitReadOnlyGetAll(true); + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnlyGetAll(boolean needVer) throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); @@ -591,9 +607,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { keys.add(i); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Map<Integer, Integer> map = cache.getAll(keys); + if (needVer) { + Collection<CacheEntry<Integer, Integer>> c = cache.getEntries(keys); - assertTrue(map.isEmpty()); + assertTrue(c.isEmpty()); + } + else { + Map<Integer, Integer> map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + } tx.commit(); } @@ -602,9 +625,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key, null, cache.getName()); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Map<Integer, Integer> map = cache.getAll(keys); + if (needVer) { + Collection<CacheEntry<Integer, Integer>> c = cache.getEntries(keys); - assertTrue(map.isEmpty()); + assertTrue(c.isEmpty()); + } + else { + Map<Integer, Integer> map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + } tx.rollback(); } @@ -653,21 +683,35 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTxConflictRead1() throws Exception { - txConflictRead(true); + txConflictRead(true, false); } /** * @throws Exception If failed. */ public void testTxConflictRead2() throws Exception { - txConflictRead(false); + txConflictRead(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadEntry1() throws Exception { + txConflictRead(true, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadEntry2() throws Exception { + txConflictRead(false, true); } /** * @param noVal If {@code true} there is no cache value when read in tx. * @throws Exception If failed. */ - private void txConflictRead(boolean noVal) throws Exception { + private void txConflictRead(boolean noVal, boolean needVer) throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); @@ -693,9 +737,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { try { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + if (needVer) { + CacheEntry<Integer, Integer> val = cache.getEntry(key); - assertEquals(expVal, val); + assertEquals(expVal, val == null ? null : val.getValue()); + } + else { + Integer val = cache.get(key); + + assertEquals(expVal, val); + } updateKey(cache, key, 1); @@ -711,9 +762,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key, 1, cache.getName()); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Object val = cache.get(key); + if (needVer) { + CacheEntry<Integer, Integer> val = cache.getEntry(key); - assertEquals(1, val); + assertEquals((Integer)1, val.getValue()); + } + else { + Object val = cache.get(key); + + assertEquals(1, val); + } tx.commit(); } @@ -731,28 +789,56 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTxConflictReadWrite1() throws Exception { - txConflictReadWrite(true, false); + txConflictReadWrite(true, false, false); } /** * @throws Exception If failed. */ public void testTxConflictReadWrite2() throws Exception { - txConflictReadWrite(false, false); + txConflictReadWrite(false, false, false); } /** * @throws Exception If failed. */ public void testTxConflictReadRemove1() throws Exception { - txConflictReadWrite(true, true); + txConflictReadWrite(true, true, false); } /** * @throws Exception If failed. */ public void testTxConflictReadRemove2() throws Exception { - txConflictReadWrite(false, true); + txConflictReadWrite(false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadEntryWrite1() throws Exception { + txConflictReadWrite(true, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadEntryWrite2() throws Exception { + txConflictReadWrite(false, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadEntryRemove1() throws Exception { + txConflictReadWrite(true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadEntryRemove2() throws Exception { + txConflictReadWrite(false, true, true); } /** @@ -760,7 +846,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @param rmv If {@code true} tests remove, otherwise put. * @throws Exception If failed. */ - private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception { + private void txConflictReadWrite(boolean noVal, boolean rmv, boolean needVer) throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); @@ -786,9 +872,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { try { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + if (needVer) { + CacheEntry<Integer, Integer> val = cache.getEntry(key); - assertEquals(expVal, val); + assertEquals(expVal, val == null ? null : val.getValue()); + } + else { + Integer val = cache.get(key); + + assertEquals(expVal, val); + } updateKey(cache, key, 1); @@ -809,9 +902,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key, 1, cache.getName()); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + if (needVer) { + CacheEntry<Integer, Integer> val = cache.getEntry(key); + + assertEquals(1, (Object)val.getValue()); + } + else { + Integer val = cache.get(key); - assertEquals(1, (Object) val); + assertEquals(1, (Object)val); + } if (rmv) cache.remove(key); @@ -4239,7 +4339,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { ignite.destroyCache(cacheName); } catch (IgniteException ignore) { - // No-op. + // No-op. } GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite.configuration().getSwapSpaceSpi();
