http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 61975d7..d2403e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -316,7 +316,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * */ private void onEntriesLocked() { - ret = new GridCacheReturn(null, tx.localResult(), null, true); + ret = new GridCacheReturn(null, tx.localResult(), true, null, true); for (IgniteTxEntry writeEntry : writes) { IgniteTxEntry txEntry = tx.entry(writeEntry.txKey()); @@ -361,7 +361,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter null, null, null, - null); + null, + txEntry.keepBinary()); if (retVal || txEntry.op() == TRANSFORM) { if (!F.isEmpty(txEntry.entryProcessors())) { @@ -377,8 +378,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { try { - CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry<>(txEntry.context(), key, val, txEntry.cached().version()); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( + txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary()); EntryProcessor<Object, Object, Object> processor = t.get1(); @@ -403,7 +404,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } else if (retVal) - ret.value(cacheCtx, val); + ret.value(cacheCtx, val, txEntry.keepBinary()); } if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { @@ -1555,10 +1556,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (rec && !entry.isInternal()) cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, - false, null, null, null); + false, null, null, null, false); if (retVal && !invoke) - ret.value(cacheCtx, info.value()); + ret.value(cacheCtx, info.value(), false); } break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index f8be2a7..ad8a402 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -122,9 +122,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { concurrency, isolation, invalidate, - timeout, + timeout, txSize, - subjId, + subjId, taskNameHash ); @@ -193,7 +193,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { concurrency, isolation, invalidate, - timeout, + timeout, txSize, subjId, taskNameHash @@ -335,7 +335,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { @Nullable CacheObject val, @Nullable Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessors, long ttl, - boolean skipStore) { + boolean skipStore, + boolean keepBinary) { checkInternal(key); if (isSystemInvalidate()) @@ -351,7 +352,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { -1L, cached, null, - skipStore); + skipStore, + keepBinary); txEntry.entryProcessors(entryProcessors); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index febe9ba..ce736e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -418,7 +418,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); if (res != null) { v = res.get1(); @@ -437,7 +438,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); } colocated.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4cd9e84..255640f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -834,6 +833,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, taskNameHash, opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, waitTopFut); @@ -899,6 +899,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, taskNameHash, opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, true); @@ -971,7 +972,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, null, taskName, - expiry); + expiry, + !deserializePortable); // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -1196,7 +1198,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (retVal == null) - retVal = new GridCacheReturn(ctx, node.isLocal(), null, true); + retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true); res.returnValue(retVal); @@ -1316,11 +1318,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int size = req.keys().size(); - Map<Object, Object> putMap = null; + Map<KeyCacheObject, CacheObject> putMap = null; Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap = null; - Collection<Object> rmvKeys = null; + Collection<KeyCacheObject> rmvKeys = null; List<CacheObject> writeVals = null; @@ -1383,13 +1385,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), entryProcessor, taskName, - null); + null, + req.keepBinary()); Object oldVal = null; Object updatedVal = null; CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old, - entry.version()); + entry.version(), req.keepBinary()); CacheObject updated; @@ -1418,7 +1421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updated == null) { if (intercept) { - CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal); + CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal, req.keepBinary()); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(e); @@ -1460,11 +1463,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (rmvKeys == null) rmvKeys = new ArrayList<>(size); - rmvKeys.add(entry.key().value(ctx.cacheObjectContext(), false)); + rmvKeys.add(entry.key()); } else { if (intercept) { - CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal); + CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal, req.keepBinary()); Object val = ctx.config().getInterceptor().onBeforePut(e, updatedVal); @@ -1508,7 +1511,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { writeVals = new ArrayList<>(size); } - putMap.put(CU.value(entry.key(), ctx, false), CU.value(updated, ctx, false)); + putMap.put(entry.key(), updated); writeVals.add(updated); } @@ -1533,10 +1536,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), null, taskName, - null); + null, + req.keepBinary()); Object val = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(ctx, entry.key(), - old), + old, req.keepBinary()), updated.value(ctx.cacheObjectContext(), false)); if (val == null) @@ -1552,7 +1556,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { writeVals = new ArrayList<>(size); } - putMap.put(CU.value(entry.key(), ctx, false), CU.value(updated, ctx, false)); + putMap.put(entry.key(), updated); writeVals.add(updated); } else { @@ -1571,10 +1575,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), null, taskName, - null); + null, + req.keepBinary()); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() - .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old)); + .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, req.keepBinary())); if (ctx.cancelRemove(interceptorRes)) continue; @@ -1583,7 +1588,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (rmvKeys == null) rmvKeys = new ArrayList<>(size); - rmvKeys.add(entry.key().value(ctx.cacheObjectContext(), false)); + rmvKeys.add(entry.key()); } filtered.add(entry); @@ -1761,6 +1766,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { primary && writeThrough() && !req.skipStore(), !req.skipStore(), req.returnValue(), + req.keepBinary(), expiry, true, true, @@ -1876,6 +1882,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retVal = new GridCacheReturn(ctx, node.isLocal(), + req.keepBinary(), req.returnValue() ? ret : null, updRes.success()); } @@ -1917,8 +1924,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridCacheVersion ver, ClusterNode node, @Nullable List<CacheObject> writeVals, - @Nullable Map<Object, Object> putMap, - @Nullable Collection<Object> rmvKeys, + @Nullable Map<KeyCacheObject, CacheObject> putMap, + @Nullable Collection<KeyCacheObject> rmvKeys, @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, @@ -1944,17 +1951,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (putMap != null) { // If fast mapping, filter primary keys for write to store. - Map<Object, Object> storeMap = req.fastMap() ? - F.view(putMap, new P1<Object>() { - @Override public boolean apply(Object key) { + Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ? + F.view(putMap, new P1<CacheObject>() { + @Override public boolean apply(CacheObject key) { return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); } }) : putMap; try { - ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<Object, IgniteBiTuple<Object, GridCacheVersion>>() { - @Override public IgniteBiTuple<Object, GridCacheVersion> apply(Object v) { + ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { + @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) { return F.t(v, ver); } })); @@ -1967,7 +1974,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } else { // If fast mapping, filter primary keys for write to store. - Collection<Object> storeKeys = req.fastMap() ? + Collection<KeyCacheObject> storeKeys = req.fastMap() ? F.view(rmvKeys, new P1<Object>() { @Override public boolean apply(Object key) { return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); @@ -2030,6 +2037,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, + req.keepBinary(), expiry, /*event*/true, /*metrics*/true, @@ -2054,14 +2062,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( ctx, entry.key(), - updRes.newValue())); + updRes.newValue(), + req.keepBinary())); } else { assert op == DELETE : op; // Old value should be already loaded for 'CacheInterceptor.onBeforeRemove'. ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), - updRes.oldValue())); + updRes.oldValue(), req.keepBinary())); } } @@ -2134,8 +2143,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx); } - if (storeErr != null) - res.addFailedKeys(storeErr.failedKeys(), storeErr.getCause(), ctx); + if (storeErr != null) { + ArrayList<KeyCacheObject> failed = new ArrayList<>(storeErr.failedKeys().size()); + + for (Object failedKey : storeErr.failedKeys()) + failed.add(ctx.toCacheKeyObject(failedKey)); + + res.addFailedKeys(failed, storeErr.getCause(), ctx); + } return dhtFut; } @@ -2361,6 +2376,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), req.taskNameHash(), req.skipStore(), + req.keepBinary(), MAX_RETRIES, true); @@ -2511,6 +2527,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, + req.keepBinary(), /*expiry policy*/null, /*event*/true, /*metrics*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4ace5c4..233a45a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -234,7 +234,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.updateReq.subjectId(), this.updateReq.taskNameHash(), forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); mappings.put(nodeId, updateReq); } @@ -290,7 +291,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.updateReq.subjectId(), this.updateReq.taskNameHash(), forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); mappings.put(nodeId, updateReq); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index e55cac9..7b95042 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -139,6 +139,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Task name hash. */ private int taskNameHash; + /** Keep portable flag. */ + private boolean keepBinary; + /** * Empty constructor required by {@link Externalizable}. */ @@ -172,7 +175,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid UUID subjId, int taskNameHash, Object[] invokeArgs, - boolean addDepInfo + boolean addDepInfo, + boolean keepBinary ) { assert invokeArgs == null || forceTransformBackups; @@ -187,6 +191,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.taskNameHash = taskNameHash; this.invokeArgs = invokeArgs; this.addDepInfo = addDepInfo; + this.keepBinary = keepBinary; keys = new ArrayList<>(); @@ -419,6 +424,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @return Keep portable flag. + */ + public boolean keepBinary() { + return keepBinary; + } + + /** * @param idx Key index. * @return Value. */ @@ -642,84 +654,90 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 9: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); case 10: - if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) + if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 12: - if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); case 13: - if (!writer.writeMessage("nearTtls", nearTtls)) + if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 14: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); case 15: - if (!writer.writeUuid("nodeId", nodeId)) + if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeUuid("nodeId", nodeId)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); case 22: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 23: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -790,7 +808,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 9: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) return false; @@ -798,7 +816,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 10: - nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -806,7 +824,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 11: - nearExpireTimes = reader.readMessage("nearExpireTimes"); + nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -814,7 +832,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 12: - nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); + nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) return false; @@ -822,7 +840,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 13: - nearTtls = reader.readMessage("nearTtls"); + nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -830,7 +848,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 14: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) return false; @@ -838,7 +856,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 15: - nodeId = reader.readUuid("nodeId"); + nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -846,7 +864,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + nodeId = reader.readUuid("nodeId"); if (!reader.isLastRead()) return false; @@ -854,6 +872,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -865,7 +891,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 18: + case 19: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -873,7 +899,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 20: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -881,7 +907,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 21: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -889,7 +915,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 22: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -897,7 +923,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 23: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -917,7 +943,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 24; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ae662c8..02c62d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -138,6 +138,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Skip store flag. */ private final boolean skipStore; + /** */ + private final boolean keepBinary; + /** Wait for topology future flag. */ private final boolean waitTopFut; @@ -184,6 +187,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> UUID subjId, int taskNameHash, boolean skipStore, + boolean keepBinary, int remapCnt, boolean waitTopFut ) { @@ -209,6 +213,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.keepBinary = keepBinary; this.waitTopFut = waitTopFut; if (log == null) @@ -474,7 +479,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, null, true)); + onDone(new GridCacheReturn(cctx, true, true, null, true)); } catch (IgniteCheckedException e) { state.onSendError(req, e); @@ -523,7 +528,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, null, true)); + onDone(new GridCacheReturn(cctx, true, true, null, true)); } /** @@ -893,7 +898,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> assert pendingMappings != null; if (size == 0) - onDone(new GridCacheReturn(cctx, true, null, true)); + onDone(new GridCacheReturn(cctx, true, true, null, true)); else doUpdate(pendingMappings); } @@ -1050,6 +1055,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, + keepBinary, cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); @@ -1143,6 +1149,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, + keepBinary, cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); @@ -1188,7 +1195,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Collection<Object> keys = new ArrayList<>(failedKeys.size()); for (KeyCacheObject key : failedKeys) - keys.add(key.value(cctx.cacheObjectContext(), false)); + keys.add(cctx.cacheObjectContext().unwrapPortableIfNeeded(key, keepBinary, false)); err0.add(keys, err, topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 33fa4bd..b7100dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -151,6 +151,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** */ private boolean clientReq; + /** Keep portable flag. */ + private boolean keepBinary; + /** * Empty constructor required by {@link Externalizable}. */ @@ -197,6 +200,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable UUID subjId, int taskNameHash, boolean skipStore, + boolean keepBinary, boolean clientReq, boolean addDepInfo ) { @@ -219,6 +223,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.keepBinary = keepBinary; this.clientReq = clientReq; this.addDepInfo = addDepInfo; @@ -336,6 +341,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Keep portable flag. + */ + public boolean keepBinary() { + return keepBinary; + } + + /** * @param key Key to add. * @param val Optional update value. * @param conflictTtl Conflict TTL (optional). @@ -685,66 +697,72 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 14: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); case 15: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeBoolean("retval", retval)) + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); case 17: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); case 18: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 19: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 20: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 21: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 23: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 24: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 25: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -855,7 +873,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 14: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) return false; @@ -863,6 +881,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 15: + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: byte opOrd; opOrd = reader.readByte("op"); @@ -874,7 +900,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 16: + case 17: retval = reader.readBoolean("retval"); if (!reader.isLastRead()) @@ -882,7 +908,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 17: + case 18: skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) @@ -890,7 +916,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 18: + case 19: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -898,7 +924,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 19: + case 20: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -910,7 +936,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -918,7 +944,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -926,7 +952,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 22: + case 23: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -934,7 +960,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 23: + case 24: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -942,7 +968,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 24: + case 25: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -962,7 +988,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 26; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 6536af3..b164e7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -377,12 +377,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param e Error cause. * @param ctx Context. */ - public synchronized void addFailedKeys(Collection<Object> keys, Throwable e, GridCacheContext ctx) { + public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) { if (failedKeys == null) failedKeys = new ArrayList<>(keys.size()); - for (Object key : keys) - failedKeys.add(ctx.toCacheKeyObject(key)); + failedKeys.addAll(keys); if (err == null) err = new IgniteCheckedException("Failed to update keys on primary node."); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index f03b461..907c68d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -216,7 +216,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte false, opCtx != null && opCtx.skipStore()); } - }); + }, opCtx); } AffinityTopologyVersion topVer = tx == null ? @@ -351,7 +351,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); if (res != null) { v = res.get1(); @@ -370,7 +371,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); } // Entry was not in memory or in swap, so we remove it from cache. @@ -487,7 +489,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte timeout, accessTtl, CU.empty0(), - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary()); // Future will be added to mvcc only if it was mapped to remote nodes. fut.map(); @@ -746,7 +749,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final long timeout, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, - final boolean skipStore + final boolean skipStore, + final boolean keepBinary ) { assert keys != null; @@ -769,7 +773,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte timeout, accessTtl, filter, - skipStore); + skipStore, + keepBinary); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -793,7 +798,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte timeout, accessTtl, filter, - skipStore); + skipStore, + keepBinary); } } ); @@ -827,7 +833,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final long timeout, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, - boolean skipStore) { + boolean skipStore, + boolean keepBinary) { int cnt = keys.size(); if (tx == null) { @@ -843,7 +850,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte threadId, accessTtl, filter, - skipStore); + skipStore, + keepBinary); // Add before mapping. if (!ctx.mvcc().addFuture(fut)) @@ -909,7 +917,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte tx.implicit(), txRead, accessTtl, - skipStore); + skipStore, + keepBinary); return new GridDhtEmbeddedFuture<>( new C2<GridCacheReturn, Exception, Exception>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 365b46b..13b3cab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -143,6 +143,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Skip store flag. */ private final boolean skipStore; + /** Keep binary. */ + private final boolean keepBinary; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -163,7 +166,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture long timeout, long accessTtl, CacheEntryPredicate[] filter, - boolean skipStore) { + boolean skipStore, + boolean keepBinary) { super(cctx.kernalContext(), CU.boolReducer()); assert keys != null; @@ -177,6 +181,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture this.accessTtl = accessTtl; this.filter = filter; this.skipStore = skipStore; + this.keepBinary = keepBinary; ignoreInterrupts(true); @@ -851,6 +856,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, skipStore, + keepBinary, clientFirst, cctx.deploymentEnabled()); @@ -1021,7 +1027,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture timeout, accessTtl, filter, - skipStore); + skipStore, + keepBinary); // Add new future. add(new GridEmbeddedFuture<>( @@ -1431,7 +1438,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture false, CU.subjectId(tx, cctx.shared()), null, - tx == null ? null : tx.resolveTaskName()); + tx == null ? null : tx.resolveTaskName(), + keepBinary); } i++; http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index db0e780..7970a44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -553,7 +553,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec if (rec && !entry.isInternal()) cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, - false, null, null, null); + false, null, null, null, false); } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index e993a88..e1ca25b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -556,7 +556,7 @@ public class GridDhtPartitionDemandPool { if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, - false, null, null, null); + false, null, null, null, false); } else if (log.isDebugEnabled()) log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 1bf03a9..648a248 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -195,6 +195,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { null, ttl, expireTime, + req.keepBinary(), req.nodeId(), req.subjectId(), taskName); @@ -224,6 +225,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Nullable byte[] valBytes, long ttl, long expireTime, + boolean keepPortable, UUID nodeId, UUID subjId, String taskName @@ -249,6 +251,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, + keepPortable, /**expiry policy*/null, /*event*/true, /*metrics*/true, @@ -347,6 +350,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, + req.keepBinary(), null, /*event*/true, /*metrics*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index d558cc5..afdc5f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -376,6 +376,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { long ttl, long expireTime, boolean evt, + boolean keepBinary, AffinityTopologyVersion topVer, UUID subjId) throws IgniteCheckedException, GridCacheEntryRemovedException { @@ -417,8 +418,20 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) - cctx.events().addEvent(partition(), key, tx, null, EVT_CACHE_OBJECT_READ, - val, val != null, old, hasVal, subjId, null, null); + cctx.events().addEvent( + partition(), + key, + tx, + null, + EVT_CACHE_OBJECT_READ, + val, + val != null, + old, + hasVal, + subjId, + null, + null, + keepBinary); return ret; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index ae1d43c..685b998 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -270,9 +270,30 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap Map<KeyCacheObject, GridNearCacheEntry> savedEntries = null; - // Assign keys to primary nodes. - for (KeyCacheObject key : keys) - savedEntries = map(key, mappings, topVer, mapped, savedEntries); + { + boolean success = false; + + try { + // Assign keys to primary nodes. + for (KeyCacheObject key : keys) + savedEntries = map(key, mappings, topVer, mapped, savedEntries); + + success = true; + } + finally { + // Exception has been thrown, must release reserved near entries. + if (!success) { + GridCacheVersion obsolete = cctx.versions().next(topVer); + + for (GridNearCacheEntry reserved : savedEntries.values()) { + reserved.releaseEviction(); + + if (reserved.markObsolete(obsolete)) + reserved.context().cache().removeEntry(reserved); + } + } + } + } if (isDone()) return; @@ -419,7 +440,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); if (res != null) { v = res.get1(); @@ -438,7 +460,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); } } @@ -466,7 +489,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); if (res != null) { v = res.get1(); @@ -485,7 +509,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap subjId, null, taskName, - expiryPlc); + expiryPlc, + !deserializePortable); } // Entry was not in memory or in swap, so we remove it from cache. @@ -537,11 +562,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - K key0 = key.value(cctx.cacheObjectContext(), true); - V val0 = v.value(cctx.cacheObjectContext(), true); - - val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); - key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); + K key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); + V val0 = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } @@ -697,6 +719,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap info.ttl(), info.expireTime(), true, + !deserializePortable, topVer, subjId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index c5b55bd..21aa457 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -152,6 +152,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean /** Skip store flag. */ private final boolean skipStore; + /** Keep binary context flag. */ + private final boolean keepBinary; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -172,7 +175,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean long timeout, long accessTtl, CacheEntryPredicate[] filter, - boolean skipStore) { + boolean skipStore, + boolean keepBinary) { super(cctx.kernalContext(), CU.boolReducer()); assert keys != null; @@ -186,6 +190,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean this.accessTtl = accessTtl; this.filter = filter; this.skipStore = skipStore; + this.keepBinary = keepBinary; ignoreInterrupts(true); @@ -968,6 +973,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, skipStore, + keepBinary, clientFirst, cctx.deploymentEnabled()); @@ -1150,7 +1156,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean hasBytes, CU.subjectId(tx, cctx.shared()), null, - inTx() ? tx.resolveTaskName() : null); + inTx() ? tx.resolveTaskName() : null, + keepBinary); if (cctx.cache().configuration().isStatisticsEnabled()) cctx.cache().metrics0().onRead(oldVal != null); @@ -1540,7 +1547,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean hasOldVal, CU.subjectId(tx, cctx.shared()), null, - inTx() ? tx.resolveTaskName() : null); + inTx() ? tx.resolveTaskName() : null, + keepBinary); if (cctx.cache().configuration().isStatisticsEnabled()) cctx.cache().metrics0().onRead(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 165da84..4d875d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -145,6 +145,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { int taskNameHash, long accessTtl, boolean skipStore, + boolean keepBinary, boolean firstClientReq, boolean addDepInfo @@ -164,6 +165,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { keyCnt, txSize, skipStore, + keepBinary, addDepInfo); assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 0e8aa0d..65a054c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -147,7 +147,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> false, skipStore); } - }); + }, opCtx); } subjId = ctx.subjectIdPerCall(subjId, opCtx); @@ -340,7 +340,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> GridCacheOperation.NOOP, null /*Value.*/, null /*dr version*/, - req.skipStore()); + req.skipStore(), + req.keepBinary()); } // Add remote candidate before reordering. @@ -454,7 +455,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> timeout, accessTtl, CU.empty0(), - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary()); if (!ctx.mvcc().addFuture(fut)) throw new IllegalStateException("Duplicate future ID: " + fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 883c285..e27d632 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -350,6 +350,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final Collection<KeyCacheObject> keys, boolean skipVals, final boolean needVer, + boolean keepBinary, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { if (cacheCtx.isNear()) { @@ -410,7 +411,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { else { assert cacheCtx.isLocal(); - return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, needVer, c); + return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, keepBinary, needVer, c); } } @@ -1079,7 +1080,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { boolean implicit, boolean read, long accessTtl, - boolean skipStore) { + boolean skipStore, + boolean keepBinary) { assert pessimistic(); try { @@ -1108,7 +1110,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { isolation, accessTtl, CU.empty0(), - skipStore); + skipStore, + keepBinary); return new GridEmbeddedFuture<>( fut, http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 87c68b2..58ee0c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -117,7 +117,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { concurrency, isolation, invalidate, - timeout, + timeout, txSize, subjId, taskNameHash @@ -187,9 +187,9 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { concurrency, isolation, invalidate, - timeout, + timeout, txSize, - subjId, + subjId, taskNameHash ); @@ -356,7 +356,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { GridCacheOperation op, CacheObject val, @Nullable GridCacheVersion drVer, - boolean skipStore + boolean skipStore, + boolean keepBinary ) throws IgniteCheckedException { checkInternal(key); @@ -389,7 +390,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { -1L, cached, drVer, - skipStore); + skipStore, + keepBinary); writeMap.put(key, txEntry); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index 85ed881..0ceae20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -84,7 +84,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { long timeout, boolean reenter, boolean tx, - boolean implicitSingle) throws GridCacheEntryRemovedException { + boolean implicitSingle, + boolean keepBinary + ) throws GridCacheEntryRemovedException { GridCacheMvccCandidate prev; GridCacheMvccCandidate cand; GridCacheMvccCandidate owner; @@ -141,7 +143,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { // Event notification. if (cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED)) cctx.events().addEvent(partition(), key, cand.nodeId(), cand, EVT_CACHE_OBJECT_LOCKED, val, hasVal, - val, hasVal, null, null, null); + val, hasVal, null, null, null, keepBinary); } checkOwnerChanged(prev, owner); @@ -207,7 +209,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { @Override public boolean tmLock(IgniteInternalTx tx, long timeout, @Nullable GridCacheVersion serOrder, - GridCacheVersion serReadVer) + GridCacheVersion serReadVer, + boolean keepBinary) throws GridCacheEntryRemovedException { GridCacheMvccCandidate cand = addLocal( tx.threadId(), @@ -217,7 +220,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { timeout, /*reenter*/false, /*tx*/true, - tx.implicitSingle() + tx.implicitSingle(), + keepBinary ); if (cand != null) { @@ -352,7 +356,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { // Event notification. if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED)) cctx.events().addEvent(partition(), key, prev.nodeId(), prev, EVT_CACHE_OBJECT_UNLOCKED, val, hasVal, - val, hasVal, null, null, null); + val, hasVal, null, null, null, true); } checkOwnerChanged(prev, owner); @@ -408,7 +412,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { // Event notification. if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED)) cctx.events().addEvent(partition(), key, doomed.nodeId(), doomed, EVT_CACHE_OBJECT_UNLOCKED, - val, hasVal, val, hasVal, null, null, null); + val, hasVal, val, hasVal, null, null, null, true); } checkOwnerChanged(prev, owner); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index cb14b4c..64820ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -230,7 +230,8 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> timeout, !inTx(), inTx(), - implicitSingle() + implicitSingle(), + true ); entries.add(entry);
