http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8b10a92..45b13b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -50,11 +50,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private static final long serialVersionUID = 0L; /** Near mappings. */ - protected Map<UUID, GridDistributedTxMapping<K, V>> nearMap = + protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>(); /** DHT mappings. */ - protected Map<UUID, GridDistributedTxMapping<K, V>> dhtMap = + protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>(); /** Mapped flag. */ @@ -90,7 +90,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param partLock If this is a group-lock transaction and the whole partition should be locked. */ protected GridDhtTxLocalAdapter( - GridCacheSharedContext<K, V> cctx, + GridCacheSharedContext cctx, GridCacheVersion xidVer, boolean implicit, boolean implicitSingle, @@ -140,7 +140,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @return {@code True} if reader was added as a result of this call. */ @Nullable protected abstract IgniteInternalFuture<Boolean> addReader(long msgId, - GridDhtCacheEntry<K, V> cached, + GridDhtCacheEntry cached, IgniteTxEntry entry, long topVer); @@ -195,13 +195,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return; } - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> dhtEntryMap = null; - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> nearEntryMap = null; + Map<ClusterNode, List<GridDhtCacheEntry>> dhtEntryMap = null; + Map<ClusterNode, List<GridDhtCacheEntry>> nearEntryMap = null; for (IgniteTxEntry e : allEntries()) { assert e.cached() != null; - GridCacheContext<K, V> cacheCtx = e.cached().context(); + GridCacheContext cacheCtx = e.cached().context(); if (cacheCtx.isNear()) continue; @@ -226,7 +226,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { nearEntryMap = new GridLeanMap<>(); cacheCtx.dhtMap(nearNodeId(), topologyVersion(), - (GridDhtCacheEntry<K, V>)e.cached(), log, dhtEntryMap, nearEntryMap); + (GridDhtCacheEntry)e.cached(), log, dhtEntryMap, nearEntryMap); } break; @@ -252,7 +252,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @return DHT map. */ - Map<UUID, GridDistributedTxMapping<K, V>> dhtMap() { + Map<UUID, GridDistributedTxMapping> dhtMap() { mapExplicitLocks(); return dhtMap; @@ -261,7 +261,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @return Near map. */ - Map<UUID, GridDistributedTxMapping<K, V>> nearMap() { + Map<UUID, GridDistributedTxMapping> nearMap() { mapExplicitLocks(); return nearMap; @@ -271,7 +271,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param nodeId Node ID. * @return Mapping. */ - GridDistributedTxMapping<K, V> dhtMapping(UUID nodeId) { + GridDistributedTxMapping dhtMapping(UUID nodeId) { return dhtMap.get(nodeId); } @@ -279,35 +279,35 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param nodeId Node ID. * @return Mapping. */ - GridDistributedTxMapping<K, V> nearMapping(UUID nodeId) { + GridDistributedTxMapping nearMapping(UUID nodeId) { return nearMap.get(nodeId); } /** * @param mappings Mappings to add. */ - void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { + void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { addMapping(mappings, dhtMap); } /** * @param mappings Mappings to add. */ - void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings) { + void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { addMapping(mappings, nearMap); } /** * @param mappings Mappings to add. */ - public void addDhtMapping(Map<UUID, GridDistributedTxMapping<K, V>> mappings) { + public void addDhtMapping(Map<UUID, GridDistributedTxMapping> mappings) { addMapping0(mappings, dhtMap); } /** * @param mappings Mappings to add. */ - public void addNearMapping(Map<UUID, GridDistributedTxMapping<K, V>> mappings) { + public void addNearMapping(Map<UUID, GridDistributedTxMapping> mappings) { addMapping0(mappings, nearMap); } /** @@ -343,7 +343,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @return {@code True} if was removed. */ private boolean removeMapping(UUID nodeId, @Nullable GridCacheEntryEx entry, - Map<UUID, GridDistributedTxMapping<K, V>> map) { + Map<UUID, GridDistributedTxMapping> map) { if (entry != null) { if (log.isDebugEnabled()) log.debug("Removing mapping for entry [nodeId=" + nodeId + ", entry=" + entry + ']'); @@ -353,7 +353,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (txEntry == null) return false; - GridDistributedTxMapping<K, V> m = map.get(nodeId); + GridDistributedTxMapping m = map.get(nodeId); boolean ret = m != null && m.removeEntry(txEntry); @@ -371,22 +371,22 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param dst Transaction mappings. */ private void addMapping( - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> mappings, - Map<UUID, GridDistributedTxMapping<K, V>> dst + Map<ClusterNode, List<GridDhtCacheEntry>> mappings, + Map<UUID, GridDistributedTxMapping> dst ) { - for (Map.Entry<ClusterNode, List<GridDhtCacheEntry<K, V>>> mapping : mappings.entrySet()) { + for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapping : mappings.entrySet()) { ClusterNode n = mapping.getKey(); - GridDistributedTxMapping<K, V> m = dst.get(n.id()); + GridDistributedTxMapping m = dst.get(n.id()); - List<GridDhtCacheEntry<K, V>> entries = mapping.getValue(); + List<GridDhtCacheEntry> entries = mapping.getValue(); - for (GridDhtCacheEntry<K, V> entry : entries) { + for (GridDhtCacheEntry entry : entries) { IgniteTxEntry txEntry = txMap.get(entry.txKey()); if (txEntry != null) { if (m == null) - dst.put(n.id(), m = new GridDistributedTxMapping<>(n)); + dst.put(n.id(), m = new GridDistributedTxMapping(n)); m.add(txEntry); } @@ -399,11 +399,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param dst Map to add to. */ private void addMapping0( - Map<UUID, GridDistributedTxMapping<K, V>> mappings, - Map<UUID, GridDistributedTxMapping<K, V>> dst + Map<UUID, GridDistributedTxMapping> mappings, + Map<UUID, GridDistributedTxMapping> dst ) { - for (Map.Entry<UUID, GridDistributedTxMapping<K, V>> entry : mappings.entrySet()) { - GridDistributedTxMapping<K, V> targetMapping = dst.get(entry.getKey()); + for (Map.Entry<UUID, GridDistributedTxMapping> entry : mappings.entrySet()) { + GridDistributedTxMapping targetMapping = dst.get(entry.getKey()); if (targetMapping == null) dst.put(entry.getKey(), entry.getValue()); @@ -415,7 +415,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** {@inheritDoc} */ - @Override public void addInvalidPartition(GridCacheContext<K, V> ctx, int part) { + @Override public void addInvalidPartition(GridCacheContext ctx, int part) { assert false : "DHT transaction encountered invalid partition [part=" + part + ", tx=" + this + ']'; } @@ -437,9 +437,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { checkInternal(e.txKey()); - GridCacheContext<K, V> cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); - GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); try { IgniteTxEntry entry = txMap.get(e.txKey()); @@ -448,7 +448,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { entry.op(e.op()); // Absolutely must set operation, as default is DELETE. entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); entry.entryProcessors(e.entryProcessors()); - entry.valueBytes(e.valueBytes()); entry.ttl(e.ttl()); entry.filters(e.filters()); entry.expiry(e.expiry()); @@ -461,22 +460,27 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { addActiveCache(dhtCache.context()); - while (true) { - GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(entry.key(), topologyVersion()); - - try { - // Set key bytes to avoid serializing in future. - cached.keyBytes(entry.keyBytes()); - - entry.cached(cached, entry.keyBytes()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when adding to dht tx (will retry): " + cached); - } - } + GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion()); + + entry.cached(cached, null); + +// TODO IGNITE-51. +// while (true) { +// GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion()); +// +// try { +// // Set key bytes to avoid serializing in future. +// cached.keyBytes(entry.keyBytes()); +// +// entry.cached(cached, null); +// +// break; +// } +// catch (GridCacheEntryRemovedException ignore) { +// if (log.isDebugEnabled()) +// log.debug("Got removed entry when adding to dht tx (will retry): " + cached); +// } +// } GridCacheVersion explicit = entry.explicitVersion(); @@ -514,8 +518,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @return Lock future. */ @SuppressWarnings("ForLoopReplaceableByForEach") - IgniteInternalFuture<GridCacheReturn<V>> lockAllAsync( - GridCacheContext<K, V> cacheCtx, + IgniteInternalFuture<GridCacheReturn<Object>> lockAllAsync( + GridCacheContext cacheCtx, List<GridCacheEntryEx> entries, boolean onePhaseCommit, long msgId, @@ -529,7 +533,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return new GridFinishedFuture<>(cctx.kernalContext(), e); } - final GridCacheReturn<V> ret = new GridCacheReturn<>(false); + final GridCacheReturn<Object> ret = new GridCacheReturn<>(false); if (F.isEmpty(entries)) return new GridFinishedFuture<>(cctx.kernalContext(), ret); @@ -539,23 +543,23 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { onePhaseCommit(onePhaseCommit); try { - Set<K> skipped = null; + Set<KeyCacheObject> skipped = null; long topVer = topologyVersion(); - GridDhtCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); // Enlist locks into transaction. for (int i = 0; i < entries.size(); i++) { GridCacheEntryEx entry = entries.get(i); - K key = entry.key(); + KeyCacheObject key = entry.key(); IgniteTxEntry txEntry = entry(entry.txKey()); // First time access. if (txEntry == null) { - GridDhtCacheEntry<K, V> cached = dhtCache.entryExx(key, topVer); + GridDhtCacheEntry cached = dhtCache.entryExx(key, topVer); addActiveCache(dhtCache.context()); @@ -567,7 +571,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { null, cached, null, - CU.<K, V>empty(), + CU.empty(), false, -1L, -1L, @@ -576,7 +580,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (read) txEntry.ttl(accessTtl); - txEntry.cached(cached, txEntry.keyBytes()); + txEntry.cached(cached, null); addReader(msgId, cached, txEntry, topVer); } @@ -590,13 +594,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { assert pessimistic(); - Collection<K> keys = F.viewReadOnly(entries, CU.<K, V>entry2Key()); + Collection<KeyCacheObject> keys = F.viewReadOnly(entries, CU.entry2Key()); // Acquire locks only after having added operation to the write set. // Otherwise, during rollback we will not know whether locks need // to be rolled back. // Loose all skipped and previously locked (we cannot reenter locks here). - final Collection<? extends K> passedKeys = skipped != null ? F.view(keys, F0.notIn(skipped)) : keys; + final Collection<KeyCacheObject> passedKeys = skipped != null ? F.view(keys, F0.notIn(skipped)) : keys; if (log.isDebugEnabled()) log.debug("Lock keys: " + passedKeys); @@ -620,14 +624,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param filter Entry write filter. * @return Future for lock acquisition. */ - private IgniteInternalFuture<GridCacheReturn<V>> obtainLockAsync( - final GridCacheContext<K, V> cacheCtx, - GridCacheReturn<V> ret, - final Collection<? extends K> passedKeys, + private IgniteInternalFuture<GridCacheReturn<Object>> obtainLockAsync( + final GridCacheContext cacheCtx, + GridCacheReturn<Object> ret, + final Collection<KeyCacheObject> passedKeys, final boolean read, - final Set<K> skipped, + final Set<KeyCacheObject> skipped, final long accessTtl, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable final IgnitePredicate<Cache.Entry<Object, Object>>[] filter) { if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + skipped + ']'); @@ -635,22 +639,24 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (passedKeys.isEmpty()) return new GridFinishedFuture<>(cctx.kernalContext(), ret); - GridDhtTransactionalCacheAdapter<K, V> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); + GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); - IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, - lockTimeout(), - this, - isInvalidate(), - read, - /*retval*/false, - isolation, - accessTtl, - CU.<K, V>empty()); + IgniteInternalFuture<Boolean> fut = null; +// TODO IGNTIE-51 +// IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, +// lockTimeout(), +// this, +// isInvalidate(), +// read, +// /*retval*/false, +// isolation, +// accessTtl, +// CU.empty()); return new GridEmbeddedFuture<>( fut, - new PLC1<GridCacheReturn<V>>(ret) { - @Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException { + new PLC1<GridCacheReturn<Object>>(ret) { + @Override protected GridCacheReturn<Object> postLock(GridCacheReturn<Object> ret) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Acquired transaction lock on keys: " + passedKeys); @@ -662,7 +668,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /*retval*/false, /*read*/read, accessTtl, - filter == null ? CU.<K, V>empty() : filter, + filter == null ? CU.empty() : filter, /**computeInvoke*/false); return ret; @@ -672,10 +678,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** {@inheritDoc} */ - @Override protected void addGroupTxMapping(Collection<IgniteTxKey<K>> keys) { + @Override protected void addGroupTxMapping(Collection<IgniteTxKey> keys) { assert groupLock(); - for (GridDistributedTxMapping<K, V> mapping : dhtMap.values()) + for (GridDistributedTxMapping mapping : dhtMap.values()) mapping.entries(Collections.unmodifiableCollection(txMap.values()), true); // Here we know that affinity key for all given keys is our group lock key. @@ -683,9 +689,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { // Add near readers. If near cache is disabled on all nodes, do nothing. Collection<UUID> backupIds = dhtMap.keySet(); - Map<ClusterNode, List<GridDhtCacheEntry<K, V>>> locNearMap = null; + Map<ClusterNode, List<GridDhtCacheEntry>> locNearMap = null; - for (IgniteTxKey<K> key : keys) { + for (IgniteTxKey key : keys) { IgniteTxEntry txEntry = entry(key); if (!txEntry.groupLockEntry() || txEntry.context().isNear()) @@ -695,7 +701,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { while (true) { try { - GridDhtCacheEntry<K, V> entry = (GridDhtCacheEntry<K, V>)txEntry.cached(); + GridDhtCacheEntry entry = (GridDhtCacheEntry)txEntry.cached(); Collection<UUID> readers = entry.readers(); @@ -711,7 +717,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (locNearMap == null) locNearMap = new HashMap<>(); - List<GridDhtCacheEntry<K, V>> entries = locNearMap.get(n); + List<GridDhtCacheEntry> entries = locNearMap.get(n); if (entries == null) locNearMap.put(n, entries = new LinkedList<>()); @@ -724,7 +730,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), txEntry.keyBytes()); + txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), null); } } } @@ -823,7 +829,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * * @param fut Expected future. */ - protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture<K, V> fut); + protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut); /** {@inheritDoc} */ @Override public void rollback() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java index 0bbc33f..d207d76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java @@ -77,12 +77,12 @@ public class GridDhtTxMapping<K, V> { * * @param mappings Mappings. */ - public void initLast(Collection<GridDistributedTxMapping<K, V>> mappings) { + public void initLast(Collection<GridDistributedTxMapping> mappings) { assert this.mappings.size() == mappings.size(); int idx = 0; - for (GridDistributedTxMapping<?, ?> map : mappings) { + for (GridDistributedTxMapping map : mappings) { TxMapping mapping = this.mappings.get(idx); map.lastBackups(lastBackups(mapping, idx)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index f9e4cae..9344b2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -65,13 +65,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** Transaction. */ @GridToStringExclude - private GridDhtTxLocalAdapter<K, V> tx; + private GridDhtTxLocalAdapter tx; /** Near mappings. */ - private Map<UUID, GridDistributedTxMapping<K, V>> nearMap; + private Map<UUID, GridDistributedTxMapping> nearMap; /** DHT mappings. */ - private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap; + private Map<UUID, GridDistributedTxMapping> dhtMap; /** Logger. */ private IgniteLogger log; @@ -101,7 +101,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu private IgniteUuid nearMiniId; /** DHT versions map. */ - private Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap; + private Map<IgniteTxKey, GridCacheVersion> dhtVerMap; /** {@code True} if this is last prepare operation for node. */ private boolean last; @@ -113,19 +113,19 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu private boolean retVal; /** Return value. */ - private GridCacheReturn<V> ret; + private GridCacheReturn<CacheObject> ret; /** Keys that did not pass the filter. */ - private Collection<IgniteTxKey<K>> filterFailedKeys; + private Collection<IgniteTxKey> filterFailedKeys; /** Keys that should be locked. */ - private GridConcurrentHashSet<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>(); + private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); /** Locks ready flag. */ private volatile boolean locksReady; /** */ - private IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb; + private IgniteInClosure<GridNearTxPrepareResponse> completeCb; /** * Empty constructor required for {@link Externalizable}. @@ -143,14 +143,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. */ public GridDhtTxPrepareFuture( - GridCacheSharedContext<K, V> cctx, - final GridDhtTxLocalAdapter<K, V> tx, + GridCacheSharedContext cctx, + final GridDhtTxLocalAdapter tx, IgniteUuid nearMiniId, - Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, + Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, boolean retVal, Collection<UUID> lastBackups, - IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + IgniteInClosure<GridNearTxPrepareResponse> completeCb ) { super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() { @Override public boolean collect(IgniteInternalTx e) { @@ -241,7 +241,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** * @return Transaction. */ - GridDhtTxLocalAdapter<K, V> tx() { + GridDhtTxLocalAdapter tx() { return tx; } @@ -275,7 +275,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu ret = new GridCacheReturn<>(null, true); for (IgniteTxEntry txEntry : tx.optimisticLockEntries()) { - GridCacheContext<K, V> cacheCtx = txEntry.context(); + GridCacheContext cacheCtx = txEntry.context(); GridCacheEntryEx cached = txEntry.cached(); @@ -297,7 +297,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) { cached.unswap(true, retVal); - V val = cached.innerGet( + CacheObject val = cached.innerGet( tx, /*swap*/true, /*read through*/(retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue(), @@ -313,21 +313,24 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (retVal) { if (!F.isEmpty(txEntry.entryProcessors())) { - K key = txEntry.key(); + KeyCacheObject key = txEntry.key(); Object procRes = null; Exception err = null; + Object keyVal = key.value(cacheCtx); + Object val0 = CU.value(val, cacheCtx); - for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) { + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { try { - CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.context(), key, val); + CacheInvokeEntry<Object, Object> invokeEntry = + new CacheInvokeEntry<>(txEntry.context(), keyVal, val0); - EntryProcessor<K, V, ?> processor = t.get1(); + EntryProcessor<Object, Object, Object> processor = t.get1(); procRes = processor.process(invokeEntry, t.get2()); - val = invokeEntry.getValue(); + val = cacheCtx.toCacheObject(invokeEntry.getValue()); } catch (Exception e) { err = e; @@ -346,7 +349,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu ret.value(val); } - if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { + if (hasFilters && !cacheCtx.isAll(cached.wrapLazyValue(), txEntry.filters())) { if (expiry != null) txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); @@ -386,7 +389,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param nodeId Sender. * @param res Result. */ - public void onResult(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) { + public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) { if (!isDone()) { for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { if (isMini(fut)) { @@ -416,17 +419,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu Collections.singletonList(tx.groupLockEntry()) : writes; for (IgniteTxEntry txEntry : checkEntries) { - GridCacheContext<K, V> cacheCtx = txEntry.context(); + GridCacheContext cacheCtx = txEntry.context(); if (cacheCtx.isLocal()) continue; - GridDistributedCacheEntry<K, V> entry = (GridDistributedCacheEntry<K, V>)txEntry.cached(); + GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached(); if (entry == null) { - entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key()); + entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key()); - txEntry.cached(entry, txEntry.keyBytes()); + txEntry.cached(entry, null); } if (tx.optimistic() && txEntry.explicitVersion() == null) { @@ -450,9 +453,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (log.isDebugEnabled()) log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key()); + entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key()); - txEntry.cached(entry, txEntry.keyBytes()); + txEntry.cached(entry, null); } } } @@ -490,13 +493,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu assert last; // Must create prepare response before transaction is committed to grab correct return value. - final GridNearTxPrepareResponse<K, V> res = createPrepareResponse(); + final GridNearTxPrepareResponse res = createPrepareResponse(); onComplete(); if (!tx.near()) { if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { - IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); + IgniteInternalFuture<IgniteInternalTx> fut = + this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> gridCacheTxGridFuture) { @@ -559,7 +563,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** * @throws IgniteCheckedException If failed to send response. */ - private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException { + private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException { if (!tx.nearNodeId().equals(cctx.localNodeId())) cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); else { @@ -572,11 +576,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** * @return Prepare response. */ - private GridNearTxPrepareResponse<K, V> createPrepareResponse() { + private GridNearTxPrepareResponse createPrepareResponse() { // Send reply back to originating near node. Throwable prepErr = err.get(); - GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>( + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( tx.nearXidVersion(), tx.colocated() ? tx.xid() : tx.nearFutureId(), nearMiniId == null ? tx.xid() : nearMiniId, @@ -605,13 +609,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** * @param res Response being sent. */ - private void addDhtValues(GridNearTxPrepareResponse<K, V> res) { + private void addDhtValues(GridNearTxPrepareResponse res) { // Interceptor on near node needs old values to execute callbacks. if (!F.isEmpty(writes)) { for (IgniteTxEntry e : writes) { IgniteTxEntry txEntry = tx.entry(e.txKey()); - GridCacheContext<K, V> cacheCtx = txEntry.context(); + GridCacheContext cacheCtx = txEntry.context(); assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']'; @@ -621,16 +625,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheVersion dhtVer = entry.version(); - V val0 = null; + CacheObject val0 = null; byte[] valBytes0 = null; GridCacheValueBytes valBytesTuple = entry.valueBytes(); if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V) valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); +// TODO IGNITE-51 +// if (valBytesTuple.isPlain()) +// val0 = (V) valBytesTuple.get(); +// else +// valBytes0 = valBytesTuple.get(); } else val0 = entry.rawGet(); @@ -642,19 +647,19 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes()); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null); } } } } - for (Map.Entry<IgniteTxKey<K>, GridCacheVersion> ver : dhtVerMap.entrySet()) { + for (Map.Entry<IgniteTxKey, GridCacheVersion> ver : dhtVerMap.entrySet()) { IgniteTxEntry txEntry = tx.entry(ver.getKey()); if (res.hasOwnedValue(ver.getKey())) continue; - GridCacheContext<K, V> cacheCtx = txEntry.context(); + GridCacheContext cacheCtx = txEntry.context(); while (true) { try { @@ -663,16 +668,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridCacheVersion dhtVer = entry.version(); if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) { - V val0 = null; + CacheObject val0 = null; byte[] valBytes0 = null; GridCacheValueBytes valBytesTuple = entry.valueBytes(); if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); - else - valBytes0 = valBytesTuple.get(); +// TODO IGNITE-51. +// if (valBytesTuple.isPlain()) +// val0 = (V)valBytesTuple.get(); +// else +// valBytes0 = valBytesTuple.get(); } else val0 = entry.rawGet(); @@ -684,7 +690,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes()); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null); } } } @@ -770,8 +776,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu onEntriesLocked(); { - Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap = new HashMap<>(); - Map<UUID, GridDistributedTxMapping<K, V>> futNearMap = new HashMap<>(); + Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>(); + Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>(); boolean hasRemoteNodes = false; @@ -796,14 +802,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu assert tx.transactionNodes() != null; // Create mini futures. - for (GridDistributedTxMapping<K, V> dhtMapping : tx.dhtMap().values()) { + for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { assert !dhtMapping.empty(); ClusterNode n = dhtMapping.node(); assert !n.isLocal(); - GridDistributedTxMapping<K, V> nearMapping = tx.nearMap().get(n.id()); + GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes(); @@ -818,7 +824,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu assert txNodes != null; - GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( futId, fut.futureId(), tx.topologyVersion(), @@ -838,7 +844,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu for (IgniteTxEntry entry : dhtWrites) { try { - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); GridCacheContext<K, V> cacheCtx = cached.context(); @@ -909,7 +915,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } } - for (GridDistributedTxMapping<K, V> nearMapping : tx.nearMap().values()) { + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { if (!tx.dhtMap().containsKey(nearMapping.node().id())) { assert nearMapping.writes() != null; @@ -917,7 +923,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu add(fut); // Append new future. - GridDhtTxPrepareRequest<K, V> req = new GridDhtTxPrepareRequest<>( + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( futId, fut.futureId(), tx.topologyVersion(), @@ -981,14 +987,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu */ private boolean map( IgniteTxEntry entry, - Map<UUID, GridDistributedTxMapping<K, V>> futDhtMap, - Map<UUID, GridDistributedTxMapping<K, V>> futNearMap) { + Map<UUID, GridDistributedTxMapping> futDhtMap, + Map<UUID, GridDistributedTxMapping> futNearMap) { if (entry.cached().isLocal()) return false; - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); - GridCacheContext<K, V> cacheCtx = entry.context(); + GridCacheContext cacheCtx = entry.context(); GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); @@ -1050,22 +1056,22 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @return {@code True} if mapped. */ private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes, - Map<UUID, GridDistributedTxMapping<K, V>> globalMap, Map<UUID, GridDistributedTxMapping<K, V>> locMap) { + Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) { boolean ret = false; if (nodes != null) { for (ClusterNode n : nodes) { - GridDistributedTxMapping<K, V> global = globalMap.get(n.id()); + GridDistributedTxMapping global = globalMap.get(n.id()); if (global == null) - globalMap.put(n.id(), global = new GridDistributedTxMapping<>(n)); + globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); global.add(entry); - GridDistributedTxMapping<K, V> loc = locMap.get(n.id()); + GridDistributedTxMapping loc = locMap.get(n.id()); if (loc == null) - locMap.put(n.id(), loc = new GridDistributedTxMapping<>(n)); + locMap.put(n.id(), loc = new GridDistributedTxMapping(n)); loc.add(entry); @@ -1123,11 +1129,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** DHT mapping. */ @GridToStringInclude - private GridDistributedTxMapping<K, V> dhtMapping; + private GridDistributedTxMapping dhtMapping; /** Near mapping. */ @GridToStringInclude - private GridDistributedTxMapping<K, V> nearMapping; + private GridDistributedTxMapping nearMapping; /** * Empty constructor required for {@link Externalizable}. @@ -1141,7 +1147,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * @param dhtMapping Mapping. * @param nearMapping nearMapping. */ - MiniFuture(UUID nodeId, GridDistributedTxMapping<K, V> dhtMapping, GridDistributedTxMapping<K, V> nearMapping) { + MiniFuture(UUID nodeId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) { super(cctx.kernalContext()); assert dhtMapping == null || nearMapping == null || dhtMapping.node() == nearMapping.node(); @@ -1192,7 +1198,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu /** * @param res Result callback. */ - void onResult(GridDhtTxPrepareResponse<K, V> res) { + void onResult(GridDhtTxPrepareResponse res) { if (res.error() != null) // Fail the whole compound future. onError(res.error()); @@ -1203,7 +1209,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (res.nearEvicted().contains(entry.txKey())) { while (true) { try { - GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached(); + GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); cached.removeReader(nearMapping.node().id(), res.messageId()); @@ -1215,7 +1221,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu if (e == null) break; - entry.cached(e, entry.keyBytes()); + entry.cached(e, null); } } } @@ -1251,7 +1257,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED); - for (GridCacheEntryInfo<K, V> info : res.preloadEntries()) { + for (GridCacheEntryInfo info : res.preloadEntries()) { GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId()); while (true) { @@ -1260,7 +1266,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE; try { - if (entry.initialValue(info.value(), info.valueBytes(), info.version(), + if (entry.initialValue(info.value(), null, info.version(), info.ttl(), info.expireTime(), true, topVer, drType)) { if (rec && !entry.isInternal()) cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index a3936d4..8579747 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -139,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param grpLockKey Group lock key if transaction is group-lock. */ public GridDhtTxRemote( - GridCacheSharedContext<K, V> ctx, + GridCacheSharedContext ctx, UUID nearNodeId, IgniteUuid rmtFutId, UUID nodeId, @@ -219,7 +219,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ - @Override protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + @Override protected boolean updateNearCache(GridCacheContext cacheCtx, KeyCacheObject key, long topVer) { if (!cacheCtx.isDht() || !isNearEnabled(cacheCtx) || cctx.localNodeId().equals(nearNodeId)) return false; @@ -231,7 +231,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ - @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) { + @Override public void addInvalidPartition(GridCacheContext cacheCtx, int part) { super.addInvalidPartition(cacheCtx, part); for (Iterator<IgniteTxEntry> it = writeMap.values().iterator(); it.hasNext();) { @@ -256,15 +256,15 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { public void addWrite(IgniteTxEntry entry, ClassLoader ldr) throws IgniteCheckedException { entry.unmarshal(cctx, false, ldr); - GridCacheContext<K, V> cacheCtx = entry.context(); + GridCacheContext cacheCtx = entry.context(); try { - GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(entry.key(), topologyVersion()); + GridDhtCacheEntry cached = cacheCtx.dht().entryExx(entry.key(), topologyVersion()); checkInternal(entry.txKey()); // Initialize cache entry. - entry.cached(cached, entry.keyBytes()); + entry.cached(cached, null); writeMap.put(entry.txKey(), entry); @@ -285,22 +285,22 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param entryProcessors Entry processors. * @param ttl TTL. */ - public void addWrite(GridCacheContext<K, V> cacheCtx, + public void addWrite(GridCacheContext cacheCtx, GridCacheOperation op, - IgniteTxKey<K> key, + IgniteTxKey key, byte[] keyBytes, - @Nullable V val, + @Nullable CacheObject val, @Nullable byte[] valBytes, - @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors, + @Nullable Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessors, long ttl) { checkInternal(key); if (isSystemInvalidate()) return; - GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), topologyVersion()); + GridDhtCacheEntry cached = cacheCtx.dht().entryExx(key.key(), topologyVersion()); - IgniteTxEntry txEntry = new IgniteTxEntry<>(cacheCtx, + IgniteTxEntry txEntry = new IgniteTxEntry(cacheCtx, this, op, val, @@ -309,8 +309,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { cached, null); - txEntry.keyBytes(keyBytes); - txEntry.valueBytes(valBytes); +// TODO IGNITE-51. +// txEntry.keyBytes(keyBytes); +// txEntry.valueBytes(valBytes); txEntry.entryProcessors(entryProcessors); writeMap.put(key, txEntry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java index 95c7fc9..a818500 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java @@ -27,15 +27,15 @@ import java.util.*; /** * Empty cache map that will never store any entries. */ -public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> { +public class GridNoStorageCacheMap extends GridCacheConcurrentMap { /** Empty triple. */ - private final GridTriple<GridCacheMapEntry<K,V>> emptyTriple = + private final GridTriple<GridCacheMapEntry> emptyTriple = new GridTriple<>(null, null, null); /** * @param ctx Cache context. */ - public GridNoStorageCacheMap(GridCacheContext<K, V> ctx) { + public GridNoStorageCacheMap(GridCacheContext ctx) { super(ctx, 0, 0.75f, 1); } @@ -60,25 +60,29 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> { } /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> randomEntry() { + @Override public GridCacheMapEntry randomEntry() { return null; } /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> getEntry(Object key) { + @Override public GridCacheMapEntry getEntry(Object key) { return null; } /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) { + @Override public GridCacheMapEntry putEntry(long topVer, KeyCacheObject key, @Nullable CacheObject val, long ttl) { throw new AssertionError(); } /** {@inheritDoc} */ - @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val, - long ttl, boolean create) { + @Override public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent(long topVer, + KeyCacheObject key, + @Nullable CacheObject val, + long ttl, + boolean create) + { if (create) { - GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val, + GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0, 0); return new GridTriple<>(entry, null, null); @@ -88,7 +92,7 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> { } /** {@inheritDoc} */ - @Override public void putAll(Map<? extends K, ? extends V> m, long ttl) { + @Override public void putAll(Map<KeyCacheObject, CacheObject> m, long ttl) { throw new AssertionError(); } @@ -98,7 +102,7 @@ public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> { } /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> removeEntryIfObsolete(K key) { + @Override public GridCacheMapEntry removeEntryIfObsolete(KeyCacheObject key) { throw new AssertionError(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index ece2fa3..eef10d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -281,7 +281,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M return; } - Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = + Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings = U.newHashMap(CU.affinityNodes(cctx, topVer).size()); final int keysSize = keys.size(); @@ -299,8 +299,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M throw err; } - - hasRmtNodes |= map(key, mappings, locVals, topVer, mapped); +// TODO IGNITE-51. +// hasRmtNodes |= map(key, mappings, locVals, topVer, mapped); } if (isDone()) @@ -328,7 +328,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cache().getDhtAsync(n.id(), -1, - mappedKeys, + // TODO IGNITE-51 + // mappedKeys, + null, readThrough, reload, topVer, @@ -355,7 +357,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M ", invalidParts=" + invalidParts + ']'; // Remap recursively. - map(remapKeys, mappings, updTopVer); + // TODO IGNITE-51 + // map(remapKeys, mappings, updTopVer); } // Add new future. @@ -382,7 +385,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M futId, fut.futureId(), ver, - mappedKeys, + // TODO IGNITE-51 + // mappedKeys, + null, readThrough, reload, topVer, @@ -428,11 +433,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M while (true) { GridCacheEntryEx entry = null; + // TODO IGNITE-51. + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + try { if (!reload && allowLocRead) { try { - entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : - colocated.peekEx(key); + entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(cacheKey) : + colocated.peekEx(cacheKey); // If our DHT cache do has value, then we peek it. if (entry != null) { @@ -456,17 +464,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeIfObsolete(key); + colocated.removeIfObsolete(cacheKey); } else { K key0 = key; - if (cctx.portableEnabled()) { - v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); - key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); - } - - locVals.put(key0, v); +// TODO IGNITE-51. +// if (cctx.portableEnabled()) { +// v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); +// key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); +// } +// +// locVals.put(key0, v); return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/760182ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8c847dc..0003f6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -128,7 +128,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long ttl, int hdrId) { - return new GridDhtAtomicCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); + return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); @@ -790,7 +790,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); - final GridNearAtomicUpdateFuture<K, V> updateFut = new GridNearAtomicUpdateFuture<>( + final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, ctx.config().getWriteSynchronizationMode(), @@ -799,13 +799,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { conflictPutMap.keySet() : conflictRmvMap.keySet(), map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, invokeArgs, - conflictPutMap != null ? conflictPutMap.values() : null, + (Collection)(conflictPutMap != null ? conflictPutMap.values() : null), conflictRmvMap != null ? conflictRmvMap.values() : null, retval, rawRetval, - cached, prj != null ? prj.expiry() : null, - filter, + (IgnitePredicate[])filter, subjId, taskNameHash); @@ -854,7 +853,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); - final GridNearAtomicUpdateFuture<K, V> updateFut = new GridNearAtomicUpdateFuture<>( + final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, ctx.config().getWriteSynchronizationMode(), @@ -866,9 +865,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { keys != null ? null : conflictMap.values(), retval, rawRetval, - cached, (filter != null && prj != null) ? prj.expiry() : null, - filter, + (IgnitePredicate[])filter, subjId, taskNameHash); @@ -1140,7 +1138,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { !ctx.dr().receiveEnabled() // and no DR. ) { // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult<K, V> updRes = updateWithBatch(node, + UpdateBatchResult updRes = updateWithBatch(node, hasNear, req, res, @@ -1253,7 +1251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws GridCacheEntryRemovedException Should not be thrown. */ @SuppressWarnings("unchecked") - private UpdateBatchResult<K, V> updateWithBatch( + private UpdateBatchResult updateWithBatch( ClusterNode node, boolean hasNear, GridNearAtomicUpdateRequest req, @@ -1276,7 +1274,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { catch (IgniteCheckedException e) { res.addFailedKeys(req.keys(), e); - return new UpdateBatchResult<>(); + return new UpdateBatchResult(); } } @@ -1288,7 +1286,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collection<KeyCacheObject> rmvKeys = null; - UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>(); + UpdateBatchResult updRes = new UpdateBatchResult(); List<GridDhtCacheEntry> filtered = new ArrayList<>(size); @@ -1351,20 +1349,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, null); + Object keyVal = entry.key().value(ctx); + Object oldVal = CU.value(old, ctx); + Object updatedVal = null; + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, - entry.key().value(ctx), - old.value(ctx)); + keyVal, + oldVal); - Object updated; + CacheObject updated; CacheInvokeResult invokeRes = null; try { Object computed = entryProcessor.process(invokeEntry, req.invokeArguments()); - updated = ctx.unwrapTemporary(invokeEntry.getValue()); + updatedVal = ctx.unwrapTemporary(invokeEntry.getValue()); - if (ctx.portableEnabled()) - updated = (V)ctx.marshalToPortable(updated); + updated = ctx.toCacheObject(updatedVal); if (computed != null) invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); @@ -1381,7 +1382,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updated == null) { if (intercept) { IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove( - entry.key(), old); + keyVal, oldVal); if (ctx.cancelRemove(interceptorRes)) continue; @@ -1423,10 +1424,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } else { if (intercept) { - updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated); + Object val = ctx.config().getInterceptor().onBeforePut(keyVal, oldVal, updatedVal); - if (updated == null) + if (val == null) continue; + + updated = ctx.toCacheKeyObject(ctx.unwrapTemporary(val)); } // Update previous batch. @@ -1460,7 +1463,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (putMap == null) putMap = new LinkedHashMap<>(size, 1.0f); - putMap.put(entry.key(), ctx.<V>unwrapTemporary(updated)); + putMap.put(entry.key(), updated); } if (entryProcessorMap == null) @@ -1469,10 +1472,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entryProcessorMap.put(entry.key(), entryProcessor); } else if (op == UPDATE) { - V updated = req.value(i); + CacheObject updated = req.value(i); if (intercept) { - V old = entry.innerGet( + CacheObject old = entry.innerGet( null, /*read swap*/true, /*read through*/ctx.loadPreviousValue(), @@ -1486,12 +1489,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, null); - updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated); + Object val = ctx.config().getInterceptor().onBeforePut(entry.key().value(ctx), + CU.value(old, ctx), + updated.value(ctx)); - if (updated == null) + if (val == null) continue; - updated = ctx.unwrapTemporary(updated); + updated = ctx.toCacheObject(ctx.unwrapTemporary(val)); } assert updated != null; @@ -1505,7 +1510,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert op == DELETE; if (intercept) { - V old = entry.innerGet( + CacheObject old = entry.innerGet( null, /*read swap*/true, /*read through*/ctx.loadPreviousValue(), @@ -1520,7 +1525,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove( - entry.key(), old); + entry.key().value(ctx), + CU.value(old, ctx)); if (ctx.cancelRemove(interceptorRes)) continue; @@ -1574,7 +1580,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws IgniteCheckedException If failed. */ private void reloadIfNeeded(final List<GridDhtCacheEntry> entries) throws IgniteCheckedException { - Map<K, Integer> needReload = null; + Map<KeyCacheObject, Integer> needReload = null; for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -1582,7 +1588,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry == null) continue; - V val = entry.rawGetOrUnmarshal(false); + CacheObject val = entry.rawGetOrUnmarshal(false); if (val == null) { if (needReload == null) @@ -1593,10 +1599,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (needReload != null) { - final Map<K, Integer> idxMap = needReload; + final Map<KeyCacheObject, Integer> idxMap = needReload; - ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<K, V>() { - @Override public void apply(K k, V v) { + ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { + @Override public void apply(KeyCacheObject k, Object v) { Integer idx = idxMap.get(k); if (idx != null) { @@ -1604,7 +1610,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { GridCacheVersion ver = entry.version(); - entry.versionedValue(v, null, ver); + entry.versionedValue(ctx.toCacheObject(v), null, ver); } catch (GridCacheEntryRemovedException e) { assert false : "Entry should not get obsolete while holding lock [entry=" + entry + @@ -1652,7 +1658,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn<Object> retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; - List<K> keys = req.keys(); + List<KeyCacheObject> keys = req.keys(); long topVer = req.topologyVersion(); @@ -1662,11 +1668,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - Map<K, EntryProcessorResult<?>> computedMap = null; + Map<KeyCacheObject, EntryProcessorResult<?>> computedMap = null; // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { - K k = keys.get(i); + KeyCacheObject k = keys.get(i); GridCacheOperation op = req.operation(); @@ -1699,7 +1705,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } - GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), locNodeId, @@ -1741,10 +1747,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { newValBytes = null; // Value has been changed. } - EntryProcessor<K, V, ?> entryProcessor = null; + EntryProcessor<Object, Object, Object> entryProcessor = null; if (req.forceTransformBackups() && op == TRANSFORM) - entryProcessor = (EntryProcessor<K, V, ?>)writeVal; + entryProcessor = (EntryProcessor<Object, Object, Object>)writeVal; if (!readersOnly) { dhtFut.addWriteEntry(entry, @@ -1775,7 +1781,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (primary && updRes.sendToDht()) { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { - GridCacheVersionConflictContext<K, V> ctx = updRes.conflictResolveResult(); + GridCacheVersionConflictContext ctx = updRes.conflictResolveResult(); if (ctx != null && ctx.isMerge()) newValBytes = null; @@ -1869,15 +1875,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { List<GridDhtCacheEntry> entries, final GridCacheVersion ver, ClusterNode node, - @Nullable Map<K, V> putMap, - @Nullable Collection<K> rmvKeys, - @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap, + @Nullable Map<KeyCacheObject, CacheObject> putMap, + @Nullable Collection<KeyCacheObject> rmvKeys, + @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, final GridNearAtomicUpdateRequest req, final GridNearAtomicUpdateResponse res, boolean replicate, - UpdateBatchResult<K, V> batchRes, + UpdateBatchResult batchRes, String taskName, @Nullable IgniteCacheExpiryPolicy expiry ) { @@ -1896,17 +1902,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (putMap != null) { // If fast mapping, filter primary keys for write to store. - Map<K, V> storeMap = req.fastMap() ? - F.view(putMap, new P1<K>() { - @Override public boolean apply(K key) { + Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ? + F.view(putMap, new P1<KeyCacheObject>() { + @Override public boolean apply(KeyCacheObject key) { return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); } }) : putMap; try { - ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() { - @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) { + ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { + @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) { return F.t(v, ver); } })); @@ -1919,9 +1925,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } else { // If fast mapping, filter primary keys for write to store. - Collection<K> storeKeys = req.fastMap() ? - F.view(rmvKeys, new P1<K>() { - @Override public boolean apply(K key) { + Collection<KeyCacheObject> storeKeys = req.fastMap() ? + F.view(rmvKeys, new P1<KeyCacheObject>() { + @Override public boolean apply(KeyCacheObject key) { return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); } }) : @@ -1956,7 +1962,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { // We are holding java-level locks on entries at this point. - V writeVal = op == UPDATE ? putMap.get(entry.key()) : null; + CacheObject writeVal = op == UPDATE ? putMap.get(entry.key()) : null; assert writeVal != null || op == DELETE : "null write value found."; @@ -1971,7 +1977,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } - GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate( + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), locNodeId, @@ -2023,7 +2029,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { byte[] valBytes = valBytesTuple.getIfMarshaled(); - EntryProcessor<K, V, ?> entryProcessor = + EntryProcessor<Object, Object, Object> entryProcessor = entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); if (!batchRes.readersOnly()) @@ -2090,7 +2096,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (storeErr != null) - res.addFailedKeys((Collection<K>)storeErr.failedKeys(), storeErr.getCause()); + res.addFailedKeys(storeErr.failedKeys(), storeErr.getCause(), ctx); return dhtFut; } @@ -2108,7 +2114,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, long topVer) throws GridDhtInvalidPartitionException { if (keys.size() == 1) { - K key = keys.get(0); + KeyCacheObject key = keys.get(0); while (true) { try { @@ -2134,7 +2140,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { List<GridDhtCacheEntry> locked = new ArrayList<>(keys.size()); while (true) { - for (K key : keys) { + for (KeyCacheObject key : keys) { try { GridDhtCacheEntry entry = entryExx(key, topVer); @@ -2152,7 +2158,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean retry = false; for (int i = 0; i < locked.size(); i++) { - GridCacheMapEntry<K, V> entry = locked.get(i); + GridCacheMapEntry entry = locked.get(i); if (entry == null) continue; @@ -2194,9 +2200,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Entries to skip eviction manager notification for. // Enqueue entries while holding locks. - Collection<K> skip = null; + Collection<KeyCacheObject> skip = null; - for (GridCacheMapEntry<K, V> entry : locked) { + for (GridCacheMapEntry entry : locked) { if (entry != null && entry.deleted()) { if (skip == null) skip = new HashSet<>(locked.size(), 1.0f); @@ -2206,7 +2212,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } // Release locks. - for (GridCacheMapEntry<K, V> entry : locked) { + for (GridCacheMapEntry entry : locked) { if (entry != null) UNSAFE.monitorExit(entry); } @@ -2223,7 +2229,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Must touch all entries since update may have deleted entries. // Eviction manager will remove empty entries. - for (GridCacheMapEntry<K, V> entry : locked) { + for (GridCacheMapEntry entry : locked) { if (entry != null && (skip == null || !skip.contains(entry.key()))) ctx.evicts().touch(entry, topVer); } @@ -2286,7 +2292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Remapping near update request locally: " + req); Collection<?> vals; - Collection<GridCacheDrInfo<V>> drPutVals; + Collection<GridCacheDrInfo<?>> drPutVals; Collection<GridCacheVersion> drRmvVals; if (req.conflictVersions() == null) { @@ -2322,7 +2328,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drPutVals = null; } - final GridNearAtomicUpdateFuture<K, V> updateFut = new GridNearAtomicUpdateFuture<>( + final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, ctx.config().getWriteSynchronizationMode(), @@ -2334,7 +2340,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drRmvVals, req.returnValue(), false, - null, req.expiry(), req.filter(), req.subjectId(), @@ -2380,7 +2385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - GridDhtAtomicUpdateFuture fut = new GridDhtAtomicUpdateFuture<>(ctx, completionCb, writeVer, updateReq, + GridDhtAtomicUpdateFuture fut = new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); ctx.mvcc().addAtomicFuture(fut.version(), fut); @@ -2392,7 +2397,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param res Near get response. */ - private void processNearGetResponse(UUID nodeId, GridNearGetResponse<K, V> res) { + private void processNearGetResponse(UUID nodeId, GridNearGetResponse res) { if (log.isDebugEnabled()) log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']'); @@ -2433,7 +2438,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture<K, V> fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); if (fut != null) fut.onResult(nodeId, res); @@ -2719,7 +2724,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Result of {@link GridDhtAtomicCache#updateWithBatch} execution. */ - private static class UpdateBatchResult<K, V> { + private static class UpdateBatchResult { /** */ private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; @@ -2730,7 +2735,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private boolean readersOnly; /** */ - private Map<K, EntryProcessorResult> invokeRes; + private Map<KeyCacheObject, EntryProcessorResult> invokeRes; /** * @param entry Entry. @@ -2765,14 +2770,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param invokeRes Result for invoke operation. */ - private void invokeResult(Map<K, EntryProcessorResult> invokeRes) { + private void invokeResult(Map<KeyCacheObject, EntryProcessorResult> invokeRes) { this.invokeRes = invokeRes; } /** * @return Result for invoke operation. */ - Map<K, EntryProcessorResult> invokeResults() { + Map<KeyCacheObject, EntryProcessorResult> invokeResults() { return invokeRes; }
