Repository: ignite Updated Branches: refs/heads/ignite-2523-1 586ac9a75 -> f9c692c81
Reworked filter in single update - it is no longer serialized as is. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b674fe9e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b674fe9e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b674fe9e Branch: refs/heads/ignite-2523-1 Commit: b674fe9ef62eec47d175e03cf787ee72bfff6c6a Parents: 586ac9a Author: vozerov-gridgain <[email protected]> Authored: Wed Apr 20 10:44:12 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Apr 20 10:44:12 2016 +0300 ---------------------------------------------------------------------- .../cache/CacheEntryPredicateContainsValue.java | 7 ++ .../processors/cache/CacheOperationFilter.java | 11 ++ .../dht/atomic/GridDhtAtomicCache.java | 3 +- .../GridNearAtomicAbstractUpdateFuture.java | 7 -- .../GridNearAtomicSingleUpdateFuture.java | 44 ++++++- .../GridNearAtomicSingleUpdateRequest.java | 121 ++++++++++--------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 7 +- 7 files changed, 128 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java index 3db8ae8..7a1ee8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java @@ -66,6 +66,13 @@ public class CacheEntryPredicateContainsValue extends CacheEntryPredicateAdapter return F.eq(thisVal, cacheVal); } + /** + * @return Value. + */ + public CacheObject value() { + return val; + } + /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { val.finishUnmarshal(ctx.cacheObjectContext(), ldr); http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java index 7fdfaac..ccd8863 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java @@ -35,6 +35,17 @@ public enum CacheOperationFilter { /** Equals to value. */ EQUALS_VAL; + /** Enum values. */ + private static final CacheOperationFilter[] VALS = values(); + + /** + * @param ord Ordinal value. + * @return Enum value. + */ + @Nullable public static CacheOperationFilter fromOrdinal(int ord) { + return ord < 0 || ord >= VALS.length ? null : VALS[ord]; + } + /** * Creare predicate from operation filter. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cbda827..f4d9aa8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1098,6 +1098,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { conflictRmvVer = ctx.versions().next(dcId); } + // TODO: Optimize - no array allocs! CacheEntryPredicate[] filters = CU.filterArray(filter); if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters, op)) { @@ -1112,7 +1113,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retval, false, opCtx != null ? opCtx.expiry() : null, - filters, + filter, ctx.subjectIdPerCall(null, opCtx), ctx.kernalContext().job().currentTaskNameHash(), opCtx != null && opCtx.skipStore(), http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 7f52299..5cbe72a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -76,9 +75,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** Expiry policy. */ protected final ExpiryPolicy expiryPlc; - /** Optional filter. */ - protected final CacheEntryPredicate[] filter; - /** Subject ID. */ protected final UUID subjId; @@ -141,7 +137,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param retval Return value flag. * @param rawRetval Raw return value flag. * @param expiryPlc Expiry policy. - * @param filter Filter. * @param subjId Subject ID. * @param taskNameHash Task name hash. * @param skipStore Skip store flag. @@ -158,7 +153,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt boolean retval, boolean rawRetval, @Nullable ExpiryPolicy expiryPlc, - CacheEntryPredicate[] filter, UUID subjId, int taskNameHash, boolean skipStore, @@ -177,7 +171,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt this.retval = retval; this.rawRetval = rawRetval; this.expiryPlc = expiryPlc; - this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 2bebc1d..196de69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -25,6 +25,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicateHasValue; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicateNoValue; +import org.apache.ignite.internal.processors.cache.CacheEntrySerializablePredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheOperationFilter; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -60,6 +66,9 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA * DHT atomic cache near update future. */ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture { + /** Optional filter. */ + private final CacheEntryPredicate filter; + /** Keys */ private Object key; @@ -100,7 +109,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda final boolean retval, final boolean rawRetval, @Nullable ExpiryPolicy expiryPlc, - final CacheEntryPredicate[] filter, + CacheEntryPredicate filter, UUID subjId, int taskNameHash, boolean skipStore, @@ -108,11 +117,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda int remapCnt, boolean waitTopFut ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, + super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, subjId, taskNameHash, skipStore, keepBinary, remapCnt, waitTopFut); assert subjId != null; + this.filter = filter; + this.key = key; this.val = val; } @@ -624,6 +635,30 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicAbstractUpdateRequest req; if (single) { + // TODO: Refactor that? + CacheOperationFilter filter0; + CacheObject filterVal = null; + + if (filter == null) + filter0 = CacheOperationFilter.ALWAYS; + else { + if (filter instanceof CacheEntrySerializablePredicate) { + CacheEntryPredicate pred = ((CacheEntrySerializablePredicate)filter).predicate(); + + if (pred instanceof CacheEntryPredicateHasValue) + filter0 = CacheOperationFilter.HAS_VAL; + else { + assert pred instanceof CacheEntryPredicateNoValue; + + filter0 = CacheOperationFilter.NO_VAL; + } + } + else { + filter0 = CacheOperationFilter.EQUALS_VAL; + filterVal = ((CacheEntryPredicateContainsValue)filter).value(); + } + } + req = new GridNearAtomicSingleUpdateRequest( cctx.cacheId(), primary.id(), @@ -637,7 +672,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda retval, expiryPlc, invokeArgs, - filter, + filter0, + filterVal, subjId, taskNameHash, skipStore, @@ -661,7 +697,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda retval, expiryPlc, invokeArgs, - filter, + CU.filterArray(filter), subjId, taskNameHash, skipStore, http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 0a19eb4..efef7d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheOperationFilter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -30,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -44,7 +44,6 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -116,7 +115,10 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd private byte[] expiryPlcBytes; /** Filter. */ - private CacheEntryPredicate[] filter; + private CacheOperationFilter filter; + + /** Filter value (expected value). */ + private CacheObject filterVal; /** Subject ID. */ private UUID subjId; @@ -183,7 +185,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd boolean retval, @Nullable ExpiryPolicy expiryPlc, @Nullable Object[] invokeArgs, - @Nullable CacheEntryPredicate[] filter, + CacheOperationFilter filter, + @Nullable CacheObject filterVal, @Nullable UUID subjId, int taskNameHash, boolean skipStore, @@ -209,6 +212,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd this.expiryPlc = expiryPlc; this.invokeArgs = invokeArgs; this.filter = filter; + this.filterVal = filterVal; this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; @@ -305,12 +309,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** {@inheritDoc} */ @Override @Nullable public CacheEntryPredicate[] filter() { - return filter; + // TODO: Optimzie - no allocs! + return CU.filterArray(filter.createPredicate(filterVal)); } /** {@inheritDoc} */ @Override public boolean hasFilter() { - return !F.isEmpty(filter); + return filter != null; } /** {@inheritDoc} */ @@ -421,21 +426,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd prepareMarshalCacheObject(key, cctx); - if (filter != null) { - boolean hasFilter = false; - - for (CacheEntryPredicate p : filter) { - if (p != null) { - hasFilter = true; - - p.prepareMarshal(cctx); - } - } - - if (!hasFilter) - filter = null; - } - if (expiryPlc != null && expiryPlcBytes == null) expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); @@ -452,6 +442,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd } else prepareMarshalCacheObject(val, cctx); + + prepareMarshalCacheObject(filterVal, cctx); } /** {@inheritDoc} */ @@ -472,12 +464,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd else finishUnmarshalCacheObject(val, cctx, ldr); - if (filter != null) { - for (CacheEntryPredicate p : filter) { - if (p != null) - p.finishUnmarshal(cctx, ldr); - } - } + finishUnmarshalCacheObject(filterVal, cctx, ldr); if (expiryPlcBytes != null && expiryPlc == null) expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); @@ -528,95 +515,100 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd writer.incrementState(); case 7: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + if (!writer.writeByte("filter", (byte)op.ordinal())) return false; writer.incrementState(); case 8: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeMessage("filterVal", filterVal)) return false; writer.incrementState(); case 9: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeMessage("futVer", futVer)) return false; writer.incrementState(); case 10: - if (!writer.writeBoolean("keepBinary", keepBinary)) + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("key", key)) + if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); case 12: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + if (!writer.writeMessage("key", key)) return false; writer.incrementState(); case 13: - if (!writer.writeBoolean("retval", retval)) + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); case 14: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); case 15: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 16: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 17: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 18: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("val", val)) + if (!writer.writeMessage("updateVer", updateVer)) return false; writer.incrementState(); + case 22: + if (!writer.writeMessage("val", val)) + return false; + + writer.incrementState(); } return true; @@ -666,15 +658,19 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 7: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + byte filterOrd; + + filterOrd = reader.readByte("filter"); if (!reader.isLastRead()) return false; + filter = CacheOperationFilter.fromOrdinal(filterOrd); + reader.incrementState(); case 8: - futVer = reader.readMessage("futVer"); + filterVal = reader.readMessage("filterVal"); if (!reader.isLastRead()) return false; @@ -682,7 +678,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 9: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + futVer = reader.readMessage("futVer"); if (!reader.isLastRead()) return false; @@ -690,7 +686,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 10: - keepBinary = reader.readBoolean("keepBinary"); + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; @@ -698,7 +694,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 11: - key = reader.readMessage("key"); + keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) return false; @@ -706,6 +702,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); case 12: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: byte opOrd; opOrd = reader.readByte("op"); @@ -717,7 +721,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 13: + case 14: retval = reader.readBoolean("retval"); if (!reader.isLastRead()) @@ -725,7 +729,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 14: + case 15: skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) @@ -733,7 +737,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 15: + case 16: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -741,7 +745,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 16: + case 17: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -753,7 +757,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 17: + case 18: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -761,7 +765,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 18: + case 19: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -769,7 +773,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 19: + case 20: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -777,7 +781,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 20: + case 21: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -785,7 +789,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd reader.incrementState(); - case 21: + case 22: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -815,12 +819,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "filter", Arrays.toString(filter), - "parent", super.toString()); + return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "parent", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index edebd8c..7021e2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -67,6 +67,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** Fast map flag. */ private final boolean fastMap; + /** Optional filter. */ + private final CacheEntryPredicate[] filter; + /** Keys */ private Collection<?> keys; @@ -134,7 +137,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu int remapCnt, boolean waitTopFut ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, + super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, subjId, taskNameHash, skipStore, keepBinary, remapCnt, waitTopFut); assert vals == null || vals.size() == keys.size(); @@ -142,6 +145,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); assert subjId != null; + this.filter = filter; + this.keys = keys; this.vals = vals; this.conflictPutVals = conflictPutVals;
