http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 1219f2f..72a60d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectCollection(CacheObject.class) private List<CacheObject> vals; + /** Previous values. */ + @GridToStringInclude + @GridDirectCollection(CacheObject.class) + private List<CacheObject> prevVals; + /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> conflictVers; @@ -139,10 +144,19 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Task name hash. */ private int taskNameHash; + /** Partition. */ + private GridLongList updateCntrs; + /** On response flag. Access should be synced on future. */ @GridDirectTransient private boolean onRes; + @GridDirectTransient + private List<Integer> partIds; + + @GridDirectTransient + private List<CacheObject> localPrevVals; + /** * Empty constructor required by {@link Externalizable}. */ @@ -193,6 +207,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.addDepInfo = addDepInfo; keys = new ArrayList<>(); + partIds = new ArrayList<>(); + localPrevVals = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); @@ -216,15 +232,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} adds previous value. + * @param prevVal Previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + int partId, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx) { keys.add(key); + partIds.add(partId); + + localPrevVals.add(prevVal); + if (forceTransformBackups) { assert entryProcessor != null; @@ -233,6 +259,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid else vals.add(val); + if (addPrevVal) { + if (prevVals == null) + prevVals = new ArrayList<>(); + + prevVals.add(prevVal); + } + + if (updateIdx != null) { + if (updateCntrs == null) + updateCntrs = new GridLongList(); + + updateCntrs.add(updateIdx); + } + // In case there is no conflict, do not create the list. if (conflictVer != null) { if (conflictVers == null) { @@ -283,8 +323,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, - long expireTime) - { + long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); @@ -415,6 +454,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @param idx Partition index. + * @return Partition id. + */ + public int partitionId(int idx) { + return partIds.get(idx); + } + + /** + * @param updCntr Update counter. + * @return Update counter. + */ + public Long updateCounter(int updCntr) { + if (updateCntrs != null && updCntr < updateCntrs.size()) + return updateCntrs.get(updCntr); + + return null; + } + + /** * @param idx Near key index. * @return Key. */ @@ -435,6 +493,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. + * @return Value. + */ + @Nullable public CacheObject previousValue(int idx) { + if (prevVals != null) + return prevVals.get(idx); + + return null; + } + + /** + * @param idx Key index. + * @return Value. + */ + @Nullable public CacheObject localPreviousValue(int idx) { + return localPrevVals.get(idx); + } + + /** + * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { @@ -544,8 +621,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return invokeArgs; } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -695,42 +771,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("updateCntrs", updateCntrs)) + return false; + + writer.incrementState(); + + case 23: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -857,7 +945,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -865,6 +953,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -876,7 +972,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 18: + case 19: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -884,7 +980,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 20: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -892,7 +988,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 21: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -900,7 +996,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 22: + updateCntrs = reader.readMessage("updateCntrs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -908,7 +1012,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 24: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -928,7 +1032,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 25; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 2f2944d..43f34c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (updateTop) { for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { if (top.cacheId() == cacheCtx.cacheId()) { - cacheCtx.topology().update(exchId, top.partitionMap(true)); + cacheCtx.topology().update(exchId, + top.partitionMap(true), + top.updateCounters()); break; } @@ -813,6 +815,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -823,6 +827,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (drCacheCtx.isDrEnabled()) drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft()); + if (topChanged) + cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion()); + // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); @@ -956,14 +963,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param id ID. * @throws IgniteCheckedException If failed. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { + private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) + throws IgniteCheckedException { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, clientOnlyExchange, cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) + if (!cacheCtx.isLocal()) { m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap()); + + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + } } if (log.isDebugEnabled()) @@ -989,15 +1000,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0; - if (ready) + if (ready) { m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + } } } // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); + m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); + } + if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -1334,15 +1351,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); + Map<Integer, Long> cntrMap = msg.partitionUpdateCounters(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx != null) - cacheCtx.topology().update(exchId, entry.getValue()); + cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); else { ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); } } } @@ -1360,7 +1379,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : cctx.exchange().clientTopology(cacheId, this); - top.update(exchId, entry.getValue()); + top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index c06d773..3f4f9bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; @@ -48,6 +49,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** */ private byte[] partsBytes; + /** Partitions update counters. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partCntrs; + + /** Serialized partitions counters. */ + private byte[] partCntrsBytes; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -92,13 +101,41 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa parts.put(cacheId, fullMap); } - /** {@inheritDoc} - * @param ctx*/ + /** + * @param cacheId Cache ID. + * @param cntrMap Partition update counters. + */ + public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) { + if (partCntrs == null) + partCntrs = new HashMap<>(); + + if (!partCntrs.containsKey(cacheId)) + partCntrs.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Partition update counters. + */ + public Map<Integer, Long> partitionUpdateCounters(int cacheId) { + if (partCntrs != null) { + Map<Integer, Long> res = partCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } + + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); if (parts != null && partsBytes == null) partsBytes = ctx.marshaller().marshal(parts); + + if (partCntrs != null) + partCntrsBytes = ctx.marshaller().marshal(partCntrs); } /** @@ -121,6 +158,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); + + if (partCntrsBytes != null) + partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr); } /** {@inheritDoc} */ @@ -139,12 +179,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (writer.state()) { case 5: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 6: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeMessage("topVer", topVer)) return false; @@ -167,7 +213,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (reader.state()) { case 5: - partsBytes = reader.readByteArray("partsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -175,6 +221,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 6: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -194,7 +248,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 83fbb1a..a2366bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; @@ -46,6 +47,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions. */ private byte[] partsBytes; + /** Partitions update counters. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partCntrs; + + /** Serialized partitions counters. */ + private byte[] partCntrsBytes; + /** */ private boolean client; @@ -90,6 +99,31 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * @param cacheId Cache ID. + * @param cntrMap Partition update counters. + */ + public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) { + if (partCntrs == null) + partCntrs = new HashMap<>(); + + partCntrs.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Partition update counters. + */ + public Map<Integer, Long> partitionUpdateCounters(int cacheId) { + if (partCntrs != null) { + Map<Integer, Long> res = partCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } + + return Collections.emptyMap(); + } + + /** * @return Local partitions. */ public Map<Integer, GridDhtPartitionMap> partitions() { @@ -103,6 +137,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); + + if (partCntrs != null) + partCntrsBytes = ctx.marshaller().marshal(partCntrs); } /** {@inheritDoc} */ @@ -111,6 +148,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); + + if (partCntrsBytes != null) + partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr); } /** {@inheritDoc} */ @@ -135,6 +175,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 6: + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -165,6 +211,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 6: + partCntrsBytes = reader.readByteArray("partCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -184,7 +238,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 1bf03a9..706655b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -249,7 +249,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, - /**expiry policy*/null, + /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, @@ -263,7 +263,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { false, false, subjId, - taskName); + taskName, + null, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); @@ -361,7 +363,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { false, intercept, req.subjectId(), - taskName); + taskName, + null, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index d078df4..ba58f57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -226,6 +226,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { } /** + * @param cntrs Partition indexes. + */ + @Override public void setPartitionUpdateCounters(long[] cntrs) { + // No-op. + } + + /** * Adds owned versions to map. * * @param vers Map of owned versions. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java new file mode 100644 index 0000000..7db9026 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Batch acknowledgement. + */ +public class CacheContinuousQueryBatchAck extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Routine ID. */ + private UUID routineId; + + /** Update counters. */ + @GridToStringInclude + @GridDirectMap(keyType = Integer.class, valueType = Long.class) + private Map<Integer, Long> updateCntrs; + + /** + * Default constructor. + */ + public CacheContinuousQueryBatchAck() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param routineId Routine ID. + * @param updateCntrs Update counters. + */ + CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateCntrs) { + this.cacheId = cacheId; + this.routineId = routineId; + this.updateCntrs = updateCntrs; + } + + /** + * @return Routine ID. + */ + UUID routineId() { + return routineId; + } + + /** + * @return Update counters. + */ + Map<Integer, Long> updateCntrs() { + return updateCntrs; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeUuid("routineId", routineId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMap("updateCntrs", updateCntrs, MessageCollectionItemType.INT, + MessageCollectionItemType.LONG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + routineId = reader.readUuid("routineId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + updateCntrs = reader.readMap("updateCntrs", MessageCollectionItemType.INT, + MessageCollectionItemType.LONG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CacheContinuousQueryBatchAck.class); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 118; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryBatchAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index a4b35eb..0495e6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -22,10 +22,12 @@ import javax.cache.event.EventType; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -42,6 +44,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private static final long serialVersionUID = 0L; /** */ + private static final byte BACKUP_ENTRY = 0b0001; + + /** */ + private static final byte FILTERED_ENTRY = 0b0010; + + /** */ private static final EventType[] EVT_TYPE_VALS = EventType.values(); /** @@ -75,8 +83,24 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridDirectTransient private GridDeploymentInfo depInfo; + /** Partition. */ + private int part; + + /** Update counter. */ + private long updateCntr; + + /** Flags. */ + private byte flags; + + /** */ + @GridToStringInclude + private AffinityTopologyVersion topVer; + + /** Filtered events. */ + private GridLongList filteredEvts; + /** - * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. + * Required by {@link Message}. */ public CacheContinuousQueryEntry() { // No-op. @@ -88,18 +112,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param part Partition. + * @param updateCntr Update partition counter. + * @param topVer Topology version if applicable. */ CacheContinuousQueryEntry( int cacheId, EventType evtType, KeyCacheObject key, @Nullable CacheObject newVal, - @Nullable CacheObject oldVal) { + @Nullable CacheObject oldVal, + int part, + long updateCntr, + @Nullable AffinityTopologyVersion topVer) { this.cacheId = cacheId; this.evtType = evtType; this.key = key; this.newVal = newVal; this.oldVal = oldVal; + this.part = part; + this.updateCntr = updateCntr; + this.topVer = topVer; + } + + /** + * @return Topology version if applicable. + */ + @Nullable AffinityTopologyVersion topologyVersion() { + return topVer; } /** @@ -117,6 +157,66 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Partition. + */ + int partition() { + return part; + } + + /** + * @return Update counter. + */ + long updateCounter() { + return updateCntr; + } + + /** + * Mark that entry create on backup. + */ + void markBackup() { + flags |= BACKUP_ENTRY; + } + + /** + * Mark that entry filtered. + */ + void markFiltered() { + flags |= FILTERED_ENTRY; + newVal = null; + oldVal = null; + key = null; + depInfo = null; + } + + /** + * @return {@code True} if entry sent by backup node. + */ + boolean isBackup() { + return (flags & BACKUP_ENTRY) != 0; + } + + /** + * @return {@code True} if entry was filtered. + */ + boolean isFiltered() { + return (flags & FILTERED_ENTRY) != 0; + } + + /** + * @param cntrs Filtered events. + */ + void filteredEvents(GridLongList cntrs) { + filteredEvts = cntrs; + } + + /** + * @return previous filtered events. + */ + long[] filteredEvents() { + return filteredEvts == null ? null : filteredEvts.array(); + } + + /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ @@ -138,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @throws IgniteCheckedException In case of error. */ void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { - key.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (!isFiltered()) { + key.finishUnmarshal(cctx.cacheObjectContext(), ldr); - if (newVal != null) - newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (newVal != null) + newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); - if (oldVal != null) - oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (oldVal != null) + oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + } } /** @@ -208,23 +310,53 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 2: - if (!writer.writeMessage("key", key)) + if (!writer.writeMessage("filteredEvts", filteredEvts)) return false; writer.incrementState(); case 3: - if (!writer.writeMessage("newVal", newVal)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 4: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("newVal", newVal)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeMessage("oldVal", oldVal)) return false; writer.incrementState(); + case 7: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeLong("updateCntr", updateCntr)) + return false; + + writer.incrementState(); + } return true; @@ -259,7 +391,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 2: - key = reader.readMessage("key"); + filteredEvts = reader.readMessage("filteredEvts"); if (!reader.isLastRead()) return false; @@ -267,7 +399,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 3: - newVal = reader.readMessage("newVal"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -275,6 +407,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 4: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + newVal = reader.readMessage("newVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: oldVal = reader.readMessage("oldVal"); if (!reader.isLastRead()) @@ -282,6 +430,30 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); + case 7: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + updateCntr = reader.readLong("updateCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(CacheContinuousQueryEntry.class); @@ -289,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 7417138..a1ebe39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { } /** {@inheritDoc} */ - @Override - public K getKey() { + @Override public K getKey() { return e.key().value(cctx.cacheObjectContext(), false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e517c70..b69d4cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -21,8 +21,21 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; @@ -30,26 +43,37 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; +import org.apache.ignite.internal.util.GridConcurrentSkipListSet; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; @@ -61,6 +85,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** */ private static final long serialVersionUID = 0L; + /** */ + private static final int BACKUP_ACK_THRESHOLD = 100; + /** Cache name. */ private String cacheName; @@ -97,9 +124,27 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; + /** Backup queue. */ + private transient Collection<CacheContinuousQueryEntry> backupQueue; + + /** */ + private boolean localCache; + + /** */ + private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; + + /** */ + private transient ConcurrentMap<Integer, EntryBuffer> entryBufs; + + /** */ + private transient AcknowledgeBuffer ackBuf; + /** */ private transient int cacheId; + /** */ + private Map<Integer, Long> initUpdCntrs; + /** * Required by {@link Externalizable}. */ @@ -121,6 +166,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param ignoreExpired Ignore expired events flag. * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. + * @param locCache {@code True} if local cache. */ public CacheContinuousQueryHandler( String cacheName, @@ -133,7 +179,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { boolean sync, boolean ignoreExpired, int taskHash, - boolean skipPrimaryCheck) { + boolean skipPrimaryCheck, + boolean locCache) { assert topic != null; assert locLsnr != null; @@ -148,6 +195,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; + this.localCache = locCache; cacheId = CU.cacheId(cacheName); } @@ -173,6 +221,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateCounters(Map<Integer, Long> cntrs) { + this.initUpdCntrs = cntrs; + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; @@ -185,8 +238,32 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (rmtFilter != null) ctx.resource().injectGeneric(rmtFilter); + entryBufs = new ConcurrentHashMap<>(); + + backupQueue = new ConcurrentLinkedDeque8<>(); + + ackBuf = new AcknowledgeBuffer(); + + rcvs = new ConcurrentHashMap<>(); + final boolean loc = nodeId.equals(ctx.localNodeId()); + assert !skipPrimaryCheck || loc; + + final GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (!internal && cctx != null && initUpdCntrs != null) { + Map<Integer, Long> map = cctx.topology().updateCounters(); + + for (Map.Entry<Integer, Long> e : map.entrySet()) { + Long cntr0 = initUpdCntrs.get(e.getKey()); + Long cntr1 = e.getValue(); + + if (cntr0 == null || cntr1 > cntr0) + initUpdCntrs.put(e.getKey(), cntr1); + } + } + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -212,11 +289,15 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) return; - GridCacheContext<K, V> cctx = cacheContext(ctx); + final GridCacheContext<K, V> cctx = cacheContext(ctx); - if (cctx.isReplicated() && !skipPrimaryCheck && !primary) + // Check that cache stopped. + if (cctx == null) return; + // skipPrimaryCheck is set only when listen locally for replicated cache events. + assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); + boolean notify = true; if (rmtFilter != null) { @@ -228,54 +309,94 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - if (notify) { - if (loc) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - else { - try { - if (cctx.deploymentEnabled() && ctx.discovery().node(nodeId) != null) { - evt.entry().prepareMarshal(cctx); - - cctx.deploy().prepare(evt.entry()); + try { + final CacheContinuousQueryEntry entry = evt.entry(); + + if (!notify) + entry.markFiltered(); + + if (primary || skipPrimaryCheck) { + if (loc) { + if (!localCache) { + Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry); + + if (!entries.isEmpty()) { + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, + new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { + @Override public CacheEntryEvent<? extends K, ? extends V> apply( + CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } + } + ); + + locLsnr.onUpdated(evts); + + if (!internal && !skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { + if (!entry.isFiltered()) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); } - else - evt.entry().prepareMarshal(cctx); - - ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true); } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); + else { + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } } + else { + if (!internal) { + entry.markBackup(); - if (recordIgniteEvt) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - rmtFilter, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); + backupQueue.add(entry); + } } } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + + if (recordIgniteEvt && notify) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } } @Override public void onUnregister() { @@ -283,6 +404,85 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister(); } + @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) { + Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + Long updateCntr = updateCntrs.get(backupEntry.partition()); + + if (updateCntr != null && backupEntry.updateCounter() <= updateCntr) + it.remove(); + } + } + + @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) { + if (backupQueue.isEmpty()) + return; + + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + for (CacheContinuousQueryEntry e : backupQueue) { + if (!e.isFiltered()) + prepareEntry(cctx, nodeId, e); + } + + ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); + + backupQueue.clear(); + } + catch (IgniteCheckedException e) { + U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e); + } + } + + @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) { + sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); + } + + @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) { + try { + assert evt != null; + + CacheContinuousQueryEntry e = evt.entry(); + + EntryBuffer buf = entryBufs.get(e.partition()); + + if (buf == null) { + buf = new EntryBuffer(); + + EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); + + if (oldRec != null) + buf = oldRec; + } + + e = buf.skipEntry(e); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true); + } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + } + + @Override public void onPartitionEvicted(int part) { + for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) { + if (it.next().partition() == part) + it.remove(); + } + } + @Override public boolean oldValueRequired() { return oldValRequired; } @@ -304,6 +504,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return mgr.registerListener(routineId, lsnr, internal); } + /** + * @param cctx Context. + * @param nodeId ID of the node that started routine. + * @param entry Entry. + * @throws IgniteCheckedException In case of error. + */ + private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry) + throws IgniteCheckedException { + if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) { + entry.prepareMarshal(cctx); + + cctx.deploy().prepare(entry); + } + else + entry.prepareMarshal(cctx); + } + /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { // No-op. @@ -366,17 +583,377 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, + Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); + + for (CacheContinuousQueryEntry e : entries) + entries0.addAll(handleEvent(ctx, e)); + + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { return new CacheContinuousQueryEvent<>(cache, cctx, e); } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } } ); locLsnr.onUpdated(evts); } + /** + * @param ctx Context. + * @param e entry. + * @return Entry collection. + */ + private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx, + CacheContinuousQueryEntry e) { + assert e != null; + + if (internal) { + if (e.isFiltered()) + return Collections.emptyList(); + else + return F.asList(e); + } + + // Initial query entry or evicted entry. + // This events should be fired immediately. + if (e.updateCounter() == -1) + return F.asList(e); + + PartitionRecovery rec = rcvs.get(e.partition()); + + if (rec == null) { + rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(), + initUpdCntrs == null ? null : initUpdCntrs.get(e.partition())); + + PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); + + if (oldRec != null) + rec = oldRec; + } + + return rec.collectEntries(e); + } + + /** + * @param e Entry. + * @return Entry. + */ + private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) { + assert e != null; + assert entryBufs != null; + + if (internal) { + if (e.isFiltered()) + return null; + else + return e; + } + + // Initial query entry. + // This events should be fired immediately. + if (e.updateCounter() == -1) + return e; + + EntryBuffer buf = entryBufs.get(e.partition()); + + if (buf == null) { + buf = new EntryBuffer(); + + EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); + + if (oldRec != null) + buf = oldRec; + } + + return buf.handle(e); + } + + /** + * + */ + private static class PartitionRecovery { + /** Event which means hole in sequence. */ + private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); + + /** */ + private final static int MAX_BUFF_SIZE = 100; + + /** */ + private IgniteLogger log; + + /** */ + private long lastFiredEvt; + + /** */ + private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; + + /** */ + private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); + + /** + * @param log Logger. + * @param topVer Topology version. + * @param initCntr Update counters. + */ + public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { + this.log = log; + + if (initCntr != null) { + this.lastFiredEvt = initCntr; + + curTop = topVer; + } + } + + /** + * Add continuous entry. + * + * @param entry Cache continuous query entry. + * @return Collection entries which will be fired. + */ + public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { + assert entry != null; + + List<CacheContinuousQueryEntry> entries; + + synchronized (pendingEvts) { + // Received first event. + if (curTop == AffinityTopologyVersion.NONE) { + lastFiredEvt = entry.updateCounter(); + + curTop = entry.topologyVersion(); + + return F.asList(entry); + } + + if (curTop.compareTo(entry.topologyVersion()) < 0) { + if (entry.updateCounter() == 1 && !entry.isBackup()) { + entries = new ArrayList<>(pendingEvts.size()); + + for (CacheContinuousQueryEntry evt : pendingEvts.values()) { + if (evt != HOLE && !evt.isFiltered()) + entries.add(evt); + } + + pendingEvts.clear(); + + curTop = entry.topologyVersion(); + + lastFiredEvt = entry.updateCounter(); + + entries.add(entry); + + return entries; + } + + curTop = entry.topologyVersion(); + } + + // Check duplicate. + if (entry.updateCounter() > lastFiredEvt) { + pendingEvts.put(entry.updateCounter(), entry); + + // Put filtered events. + if (entry.filteredEvents() != null) { + for (long cnrt : entry.filteredEvents()) { + if (cnrt > lastFiredEvt) + pendingEvts.put(cnrt, HOLE); + } + } + } + else { + if (log.isDebugEnabled()) + log.debug("Skip duplicate continuous query message: " + entry); + + return Collections.emptyList(); + } + + if (pendingEvts.isEmpty()) + return Collections.emptyList(); + + Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); + + entries = new ArrayList<>(); + + if (pendingEvts.size() >= MAX_BUFF_SIZE) { + for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(e.getValue()); + + lastFiredEvt = e.getKey(); + + iter.remove(); + } + } + else { + // Elements are consistently. + while (iter.hasNext()) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + if (e.getKey() == lastFiredEvt + 1) { + ++lastFiredEvt; + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(e.getValue()); + + iter.remove(); + } + else + break; + } + } + } + + return entries; + } + } + + /** + * + */ + private static class EntryBuffer { + /** */ + private final static int MAX_BUFF_SIZE = 100; + + /** */ + private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>(); + + /** */ + private AtomicLong lastFiredCntr = new AtomicLong(); + + /** + * @param newVal New value. + * @return Old value if previous value less than new value otherwise {@code -1}. + */ + private long updateFiredCounter(long newVal) { + long prevVal = lastFiredCntr.get(); + + while (prevVal < newVal) { + if (lastFiredCntr.compareAndSet(prevVal, newVal)) + return prevVal; + else + prevVal = lastFiredCntr.get(); + } + + return prevVal >= newVal ? -1 : prevVal; + } + + /** + * @param e Entry. + * @param topVer Topology version. + * @return Continuous query entry. + */ + private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) { + if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) { + + e.markFiltered(); + + return e; + } + else { + buf.add(e.updateCounter()); + + // Double check. If another thread sent a event with counter higher than this event. + if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { + buf.remove(e.updateCounter()); + + e.markFiltered(); + + return e; + } + else + return null; + } + } + + /** + * Add continuous entry. + * + * @param e Cache continuous query entry. + * @return Collection entries which will be fired. + */ + public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) { + assert e != null; + + if (e.isFiltered()) { + Long last = buf.lastx(); + Long first = buf.firstx(); + + if (last != null && first != null && last - first >= MAX_BUFF_SIZE) { + NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true); + + GridLongList filteredEvts = new GridLongList((int)(last - first)); + + int size = 0; + + Long cntr; + + while ((cntr = prevHoles.pollFirst()) != null) { + filteredEvts.add(cntr); + + ++size; + } + + filteredEvts.truncate(size, true); + + e.filteredEvents(filteredEvts); + + return e; + } + + if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) + return e; + else { + buf.add(e.updateCounter()); + + // Double check. If another thread sent a event with counter higher than this event. + if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { + buf.remove(e.updateCounter()); + + return e; + } + else + return null; + } + } + else { + long prevVal = updateFiredCounter(e.updateCounter()); + + if (prevVal == -1) + return e; + else { + NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true); + + GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal)); + + int size = 0; + + Long cntr; + + while ((cntr = prevHoles.pollFirst()) != null) { + filteredEvts.add(cntr); + + ++size; + } + + filteredEvts.truncate(size, true); + + e.filteredEvents(filteredEvts); + + return e; + } + } + } + } + /** {@inheritDoc} */ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { assert ctx != null; @@ -397,6 +974,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(final UUID routineId, + GridContinuousBatch batch, + final GridKernalContext ctx) { + sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); + } + + /** + * @param t Acknowledge information. + * @param routineId Routine ID. + * @param ctx Context. + */ + private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t, + final UUID routineId, + final GridKernalContext ctx) { + if (t != null) { + ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(), + routineId, + t.get1()); + + Collection<ClusterNode> nodes = new HashSet<>(); + + for (AffinityTopologyVersion topVer : t.get2()) + nodes.addAll(ctx.discovery().cacheNodes(topVer)); + + for (ClusterNode node : nodes) { + if (!node.id().equals(ctx.localNodeId())) { + try { + cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send acknowledge message, node left " + + "[msg=" + msg + ", node=" + node + ']'); + } + catch (IgniteCheckedException e) { + IgniteLogger log = ctx.log(getClass()); + + U.error(log, "Failed to send acknowledge message " + + "[msg=" + msg + ", node=" + node + ']', e); + } + } + } + } + }); + } + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return topic; } @@ -471,6 +1107,93 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return ctx.cache().<K, V>context().cacheContext(cacheId); } + /** */ + private static class AcknowledgeBuffer { + /** */ + private int size; + + /** */ + @GridToStringInclude + private Map<Integer, Long> updateCntrs = new HashMap<>(); + + /** */ + @GridToStringInclude + private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); + + /** + * @param batch Batch. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @SuppressWarnings("unchecked") + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(GridContinuousBatch batch) { + size += batch.size(); + + Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); + + for (CacheContinuousQueryEntry e : entries) + addEntry(e); + + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(CacheContinuousQueryEntry e) { + size++; + + addEntry(e); + + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + */ + private void addEntry(CacheContinuousQueryEntry e) { + topVers.add(e.topologyVersion()); + + Long cntr0 = updateCntrs.get(e.partition()); + + if (cntr0 == null || e.updateCounter() > cntr0) + updateCntrs.put(e.partition(), e.updateCounter()); + } + + /** + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + acknowledgeOnTimeout() { + return size > 0 ? acknowledgeData() : null; + } + + /** + * @return Tuple with acknowledge information. + */ + private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { + assert size > 0; + + Map<Integer, Long> cntrs = new HashMap<>(updateCntrs); + + IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = + new IgniteBiTuple<>(cntrs, topVers); + + topVers = U.newHashSet(1); + + size = 0; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AcknowledgeBuffer.class, this); + } + } + /** * Deployable object. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index a3c19a9..8342acf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.util.Map; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + /** * Continuous query listener. */ @@ -41,6 +45,37 @@ interface CacheContinuousQueryListener<K, V> { public void onUnregister(); /** + * Cleans backup queue. + * + * @param updateCntrs Update indexes map. + */ + public void cleanupBackupQueue(Map<Integer, Long> updateCntrs); + + /** + * Flushes backup queue. + * + * @param ctx Context. + * @param topVer Topology version. + */ + public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer); + + /** + * @param ctx Context. + */ + public void acknowledgeBackupOnTimeout(GridKernalContext ctx); + + /** + * @param evt Event + * @param topVer Topology version. + */ + public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer); + + /** + * @param part Partition. + */ + public void onPartitionEvicted(int part); + + /** * @return Whether old value is required. */ public boolean oldValueRequired();
