IGNITE-426 temp commit.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/384e57ca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/384e57ca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/384e57ca Branch: refs/heads/ignite-426-2-reb Commit: 384e57caeb827397f4c37ea5e6e8952ae6b65b03 Parents: e769c3a Author: nikolay_tikhonov <[email protected]> Authored: Wed Sep 2 15:38:50 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 15:13:21 2015 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 40 +- .../distributed/dht/GridDhtLocalPartition.java | 61 +- .../dht/GridDhtPartitionTopology.java | 26 +- .../dht/GridDhtPartitionTopologyImpl.java | 112 +- .../CacheContinuousQueryBatchAck.java | 156 +++ .../continuous/CacheContinuousQueryEntry.java | 74 +- .../continuous/CacheContinuousQueryHandler.java | 347 +++++- .../CacheContinuousQueryListener.java | 34 +- .../continuous/CacheContinuousQueryManager.java | 88 +- .../continuous/GridContinuousBatch.java | 7 + .../continuous/GridContinuousBatchAdapter.java | 7 + .../continuous/GridContinuousProcessor.java | 173 ++- ...acheContinuousQueryFailoverAbstractTest.java | 1104 ++++++++++++++++++ ...ueryFailoverAtomicPrimaryWriteOrderTest.java | 32 + ...inuousQueryFailoverAtomicReplicatedTest.java | 39 + .../CacheContinuousQueryFailoverAtomicTest.java | 39 + ...ContinuousQueryFailoverTxReplicatedTest.java | 32 + .../CacheContinuousQueryFailoverTxTest.java | 39 + 18 files changed, 2313 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 162c116..516b7bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -94,6 +94,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Lock. */ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** Partition update counter. */ + private Map<Integer, Long> cntrMap = new HashMap<>(); + /** * @param cctx Context. * @param cacheId Cache ID. @@ -527,7 +530,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionFullMap partMap) { + GridDhtPartitionFullMap partMap, + Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@ -602,6 +606,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { part2node = p2n; + if (cntrMap != null) + this.cntrMap = new HashMap<>(cntrMap); + consistencyCheck(); if (log.isDebugEnabled()) @@ -617,7 +624,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts) { + GridDhtPartitionMap parts, + Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -698,6 +706,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + if (cntrMap != null) { + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + } + consistencyCheck(); if (log.isDebugEnabled()) @@ -852,6 +869,25 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map<Integer, Long> updateCounters() { + lock.readLock().lock(); + + try { + return new HashMap<>(cntrMap); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + assert false; + + return false; + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 4f124e6..975d76c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -17,6 +17,18 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicStampedReference; +import java.util.concurrent.locks.ReentrantLock; +import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; @@ -111,11 +123,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, private final LongAdder8 mapPubSize = new LongAdder8(); /** Remove queue. */ - private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; + private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; /** Group reservations. */ private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); + /** Continuous query update index. */ + private final AtomicLong contQryUpdIdx = new AtomicLong(); + /** * @param cctx Context. * @param id Partition ID. @@ -141,7 +156,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); - rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); + if (cctx.deferredDelete()) + rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); } /** @@ -295,6 +311,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @throws IgniteCheckedException If failed. */ public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException { + assert cctx.deferredDelete(); + try { T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver)); @@ -496,7 +514,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); - clearDeferredDeletes(); + if (cctx.deferredDelete()) + clearDeferredDeletes(); return new GridFinishedFuture<>(true); } @@ -541,13 +560,16 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, if (cctx.isDrEnabled()) cctx.dr().partitionEvicted(id); + cctx.continuousQueries().onPartitionEvicted(id); + cctx.dataStructures().onPartitionEvicted(id); rent.onDone(); ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); - clearDeferredDeletes(); + if (cctx.deferredDelete()) + clearDeferredDeletes(); return true; } @@ -604,6 +626,35 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** + * @return Next update index. + */ + public long nextContinuousQueryUpdateIndex() { + return contQryUpdIdx.incrementAndGet(); + } + + /** + * @return Current update index. + */ + public long continuousQueryUpdateIndex() { + return contQryUpdIdx.get(); + } + + /** + * @param val Update index value. + */ + public void continuousQueryUpdateIndex(long val) { + while (true) { + long val0 = contQryUpdIdx.get(); + + if (val0 >= val) + break; + + if (contQryUpdIdx.compareAndSet(val0, val)) + break; + } + } + + /** * Clears values for this partition. */ private void clearAll() { @@ -753,6 +804,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ private void clearDeferredDeletes() { + assert cctx.deferredDelete(); + rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() { @Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) { cctx.dht().removeVersionedEntry(t.get1(), t.get2()); http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d642314..3ac2b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; @@ -51,6 +52,8 @@ public interface GridDhtPartitionTopology { * * @param exchId Exchange ID. * @param exchFut Exchange future. + * @param updateSeq Update sequence. + * @param stopping Stopping flag. * @throws IgniteInterruptedCheckedException If interrupted. */ public void updateTopologyVersion( @@ -193,17 +196,27 @@ public interface GridDhtPartitionTopology { /** * @param exchId Exchange ID. * @param partMap Update partition map. + * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ - public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap); + public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionFullMap partMap, + @Nullable Map<Integer, Long> cntrMap); /** * @param exchId Exchange ID. * @param parts Partitions. + * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts); + GridDhtPartitionMap parts, + @Nullable Map<Integer, Long> cntrMap); + + /** + * @return Partition update counters. + */ + public Map<Integer, Long> updateCounters(); /** * @param part Partition to own. @@ -213,6 +226,7 @@ public interface GridDhtPartitionTopology { /** * @param part Evicted partition. + * @param updateSeq Update sequence increment flag. */ public void onEvicted(GridDhtLocalPartition part, boolean updateSeq); @@ -228,4 +242,10 @@ public interface GridDhtPartitionTopology { * @param threshold Threshold for number of entries. */ public void printMemoryStats(int threshold); -} \ No newline at end of file + + /** + * @param topVer Topology version. + * @return {@code True} if rebalance process finished. + */ + public boolean rebalanceFinished(AffinityTopologyVersion topVer); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6bd283a..5d312b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -102,6 +102,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Lock. */ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** Partition update counter. */ + private Map<Integer, Long> cntrMap = new HashMap<>(); + + /** */ + private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; + /** * @param cctx Context. */ @@ -131,6 +137,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topReadyFut = null; topVer = AffinityTopologyVersion.NONE; + + rebalancedTopVer = AffinityTopologyVersion.NONE; } finally { lock.writeLock().unlock(); @@ -220,6 +228,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { updateSeq.setIfGreater(updSeq); topReadyFut = exchFut; + + rebalancedTopVer = AffinityTopologyVersion.NONE;; } finally { lock.writeLock().unlock(); @@ -525,6 +535,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + updateRebalanceVersion(); + consistencyCheck(); } finally { @@ -732,7 +744,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param states Additional partition states. * @return List of nodes for the partition. */ - private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + private List<ClusterNode> nodes(int p, + AffinityTopologyVersion topVer, + GridDhtPartitionState state, + GridDhtPartitionState... states) { Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; lock.readLock().lock(); @@ -831,7 +846,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionFullMap partMap) { + GridDhtPartitionFullMap partMap, + @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@ -911,8 +927,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part2node = p2n; + if (cntrMap != null) { + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr = cntrMap.get(part.id()); + + if (cntr != null) + part.continuousQueryUpdateIndex(cntr); + } + } + boolean changed = checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -928,7 +962,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts) { + GridDhtPartitionMap parts, + @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -1006,8 +1041,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + if (cntrMap != null) { + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr = cntrMap.get(part.id()); + + if (cntr != null) + part.continuousQueryUpdateIndex(cntr); + } + } + changed |= checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -1204,6 +1257,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part.own()) { updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); + updateRebalanceVersion(); + consistencyCheck(); return true; @@ -1254,14 +1309,61 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map<Integer, Long> updateCounters() { + lock.readLock().lock(); + + try { + Map<Integer, Long> res = new HashMap<>(cntrMap); + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr0 = res.get(part.id()); + Long cntr1 = part.continuousQueryUpdateIndex(); + + if (cntr0 == null || cntr1 > cntr0) + res.put(part.id(), cntr1); + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + return topVer.equals(rebalancedTopVer); + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); + X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); for (GridDhtLocalPartition part : locParts.values()) { int size = part.size(); if (size >= threshold) - X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); + X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); + } + } + + /** + * + */ + private void updateRebalanceVersion() { + if (!rebalancedTopVer.equals(topVer)) { + for (int i = 0; i < cctx.affinity().partitions(); i++) { + List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer); + List<ClusterNode> owners = owners(i); + + if (affNodes.size() != owners.size() || !owners.containsAll(affNodes)) + return; + } + + rebalancedTopVer = topVer; + + if (log.isDebugEnabled()) + log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java new file mode 100644 index 0000000..1e9a848 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Batch acknowledgement. + */ +public class CacheContinuousQueryBatchAck extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Routine ID. */ + private UUID routineId; + + /** Update indexes. */ + @GridToStringInclude + @GridDirectMap(keyType = Integer.class, valueType = Long.class) + private Map<Integer, Long> updateIdxs; + + /** + * Default constructor. + */ + public CacheContinuousQueryBatchAck() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param routineId Routine ID. + * @param updateIdxs Update indexes. + */ + CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateIdxs) { + this.cacheId = cacheId; + this.routineId = routineId; + this.updateIdxs = updateIdxs; + } + + /** + * @return Routine ID. + */ + UUID routineId() { + return routineId; + } + + /** + * @return Update indexes. + */ + Map<Integer, Long> updateIndexes() { + return updateIdxs; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeUuid("routineId", routineId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + routineId = reader.readUuid("routineId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 114; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryBatchAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index a4b35eb..9ea9b73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -22,6 +22,7 @@ import javax.cache.event.EventType; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; @@ -75,6 +76,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridDirectTransient private GridDeploymentInfo depInfo; + /** Partition. */ + private int part; + + /** Update index. */ + private long updateIdx; + + /** */ + @GridToStringInclude + @GridDirectTransient + private AffinityTopologyVersion topVer; + /** * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. */ @@ -88,18 +100,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param part Partition. + * @param updateIdx Update index. + * @param topVer Topology version if applicable. */ CacheContinuousQueryEntry( int cacheId, EventType evtType, KeyCacheObject key, @Nullable CacheObject newVal, - @Nullable CacheObject oldVal) { + @Nullable CacheObject oldVal, + int part, + long updateIdx, + @Nullable AffinityTopologyVersion topVer) { this.cacheId = cacheId; this.evtType = evtType; this.key = key; this.newVal = newVal; this.oldVal = oldVal; + this.part = part; + this.updateIdx = updateIdx; + this.topVer = topVer; + } + + /** + * @return Topology version if applicable. + */ + @Nullable AffinityTopologyVersion topologyVersion() { + return topVer; } /** @@ -117,6 +145,20 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Partition. + */ + int partition() { + return part; + } + + /** + * @return Update index. + */ + long updateIndex() { + return updateIdx; + } + + /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ @@ -225,6 +267,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); + case 5: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeLong("updateIdx", updateIdx)) + return false; + + writer.incrementState(); + } return true; @@ -282,6 +336,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); + case 5: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + updateIdx = reader.readLong("updateIdx"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(CacheContinuousQueryEntry.class); @@ -289,7 +359,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e517c70..4f783db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -22,7 +22,12 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; @@ -35,13 +40,17 @@ import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; import org.apache.ignite.internal.util.typedef.C1; @@ -49,7 +58,10 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; @@ -61,6 +73,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** */ private static final long serialVersionUID = 0L; + /** */ + private static final int BACKUP_ACK_THRESHOLD = 100; + /** Cache name. */ private String cacheName; @@ -97,6 +112,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; + /** Backup queue. */ + private transient Collection<CacheContinuousQueryEntry> backupQueue; + + /** */ + private transient Map<Integer, Long> rcvCntrs; + + /** */ + private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter; + + /** */ + private transient AcknowledgeBuffer ackBuf; + /** */ private transient int cacheId; @@ -121,6 +148,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param ignoreExpired Ignore expired events flag. * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. + * @param locCache {@code True} if local cache. */ public CacheContinuousQueryHandler( String cacheName, @@ -133,7 +161,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { boolean sync, boolean ignoreExpired, int taskHash, - boolean skipPrimaryCheck) { + boolean skipPrimaryCheck, + boolean locCache) { assert topic != null; assert locLsnr != null; @@ -149,6 +178,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; + if (locCache) + dupEvtFilter = F.alwaysTrue(); + else { + rcvCntrs = new ConcurrentHashMap<>(); + + dupEvtFilter = new DuplicateEventFilter(); + } + cacheId = CU.cacheId(cacheName); } @@ -185,8 +222,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (rmtFilter != null) ctx.resource().injectGeneric(rmtFilter); + backupQueue = new ConcurrentLinkedDeque8<>(); + + ackBuf = new AcknowledgeBuffer(); + final boolean loc = nodeId.equals(ctx.localNodeId()); + assert !skipPrimaryCheck || loc; + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -207,15 +250,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, + boolean primary, boolean recordIgniteEvt) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) return; GridCacheContext<K, V> cctx = cacheContext(ctx); - if (cctx.isReplicated() && !skipPrimaryCheck && !primary) - return; + // skipPrimaryCheck is set only when listen locally for replicated cache events. + assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); boolean notify = true; @@ -229,30 +273,36 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } if (notify) { - if (loc) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - else { - try { - if (cctx.deploymentEnabled() && ctx.discovery().node(nodeId) != null) { - evt.entry().prepareMarshal(cctx); - - cctx.deploy().prepare(evt.entry()); + try { + final CacheContinuousQueryEntry entry = evt.entry(); + + if (primary || skipPrimaryCheck) { + if (loc) { + if (dupEvtFilter.apply(entry)) { + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + + if (!skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } } - else - evt.entry().prepareMarshal(cctx); + else { + prepareEntry(cctx, nodeId, entry); - ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true); + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + } } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); + else + backupQueue.add(entry); + } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); } if (recordIgniteEvt) { @@ -283,6 +333,49 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister(); } + @Override public void cleanupBackupQueue(Map<Integer, Long> updateIdxs) { + Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + Long updateIdx = updateIdxs.get(backupEntry.partition()); + + if (updateIdx != null && backupEntry.updateIndex() <= updateIdx) + it.remove(); + } + } + + @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) { + if (backupQueue.isEmpty()) + return; + + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + for (CacheContinuousQueryEntry e : backupQueue) + prepareEntry(cctx, nodeId, e); + + ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); + + backupQueue.clear(); + } + catch (IgniteCheckedException e) { + U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e); + } + } + + @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) { + sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); + } + + @Override public void onPartitionEvicted(int part) { + for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) { + if (it.next().partition() == part) + it.remove(); + } + } + @Override public boolean oldValueRequired() { return oldValRequired; } @@ -304,6 +397,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return mgr.registerListener(routineId, lsnr, internal); } + /** + * @param cctx Context. + * @param nodeId ID of the node that started routine. + * @param entry Entry. + * @throws IgniteCheckedException In case of error. + */ + private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry) + throws IgniteCheckedException { + if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) { + entry.prepareMarshal(cctx); + + cctx.deploy().prepare(entry); + } + else + entry.prepareMarshal(cctx); + } + /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { // No-op. @@ -371,12 +481,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { return new CacheContinuousQueryEvent<>(cache, cctx, e); } - } + }, + dupEvtFilter ); locLsnr.onUpdated(evts); } + /** + * @param e Entry. + * @return {@code True} if listener should be notified. + */ + private boolean notifyListener(CacheContinuousQueryEntry e) { + Integer part = e.partition(); + + Long cntr = rcvCntrs.get(part); + + if (cntr != null) { + long cntr0 = cntr; + + if (e.updateIndex() > cntr0) { + // TODO IGNITE-426: remove assert. + assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']'; + + rcvCntrs.put(part, e.updateIndex()); + } + else + return false; + } + else + rcvCntrs.put(part, e.updateIndex()); + + return true; + } + /** {@inheritDoc} */ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { assert ctx != null; @@ -397,6 +535,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(final UUID routineId, + GridContinuousBatch batch, + final GridKernalContext ctx) { + sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); + } + + /** + * @param t Acknowledge information. + * @param routineId Routine ID. + * @param ctx Context. + */ + private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t, + final UUID routineId, + final GridKernalContext ctx) { + if (t != null) { + ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(), + routineId, + t.get1()); + + Collection<ClusterNode> nodes = new HashSet<>(); + + for (AffinityTopologyVersion topVer : t.get2()) + nodes.addAll(ctx.discovery().cacheNodes(topVer)); + + for (ClusterNode node : nodes) { + if (!node.id().equals(ctx.localNodeId())) { + try { + cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send acknowledge message, node left " + + "[msg=" + msg + ", node=" + node + ']'); + } + catch (IgniteCheckedException e) { + IgniteLogger log = ctx.log(getClass()); + + U.error(log, "Failed to send acknowledge message " + + "[msg=" + msg + ", node=" + node + ']', e); + } + } + } + } + }); + } + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return topic; } @@ -471,6 +668,106 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return ctx.cache().<K, V>context().cacheContext(cacheId); } + /** */ + private static class AcknowledgeBuffer { + /** */ + private int size; + + /** */ + @GridToStringInclude + private Map<Integer, Long> updateIdxs = new HashMap<>(); + + /** */ + @GridToStringInclude + private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); + + /** + * @param batch Batch. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @SuppressWarnings("unchecked") + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(GridContinuousBatch batch) { + size += batch.size(); + + Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); + + for (CacheContinuousQueryEntry e : entries) + addEntry(e); + + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(CacheContinuousQueryEntry e) { + size++; + + addEntry(e); + + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + */ + private void addEntry(CacheContinuousQueryEntry e) { + topVers.add(e.topologyVersion()); + + Long cntr0 = updateIdxs.get(e.partition()); + + if (cntr0 == null || e.updateIndex() > cntr0) + updateIdxs.put(e.partition(), e.updateIndex()); + } + + /** + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + acknowledgeOnTimeout() { + return size > 0 ? acknowledgeData() : null; + } + + /** + * @return Tuple with acknowledge information. + */ + private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { + assert size > 0; + + Map<Integer, Long> idxs = new HashMap<>(updateIdxs); + + IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = + new IgniteBiTuple<>(idxs, topVers); + + topVers = U.newHashSet(1); + + size = 0; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AcknowledgeBuffer.class, this); + } + } + + /** + * + */ + private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(CacheContinuousQueryEntry e) { + return notifyListener(e); + } + } + /** * Deployable object. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index a3c19a9..2f9e111 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -17,6 +17,11 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; + +import java.util.*; + /** * Continuous query listener. */ @@ -33,7 +38,9 @@ interface CacheContinuousQueryListener<K, V> { * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. */ - public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); + public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, + boolean primary, + boolean recordIgniteEvt); /** * Listener unregistered callback. @@ -41,6 +48,31 @@ interface CacheContinuousQueryListener<K, V> { public void onUnregister(); /** + * Cleans backup queue. + * + * @param updateIdxs Update indexes map. + */ + public void cleanupBackupQueue(Map<Integer, Long> updateIdxs); + + /** + * Flushes backup queue. + * + * @param ctx Context. + * @param topVer Topology version. + */ + public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer); + + /** + * @param ctx Context. + */ + public void acknowledgeBackupOnTimeout(GridKernalContext ctx); + + /** + * @param part Partition. + */ + public void onPartitionEvicted(int part); + + /** * @return Whether old value is required. */ public boolean oldValueRequired(); http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c7bf091..f0e9c0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; @@ -82,6 +83,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** */ private static final byte EXPIRED_FLAG = 0b1000; + /** */ + private static final long BACKUP_ACK_FREQ = 5000; + /** Listeners. */ private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>(); @@ -108,6 +112,26 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { @Override protected void start0() throws IgniteCheckedException { // Append cache name to the topic. topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); + + cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, + new CI2<UUID, CacheContinuousQueryBatchAck>() { + @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { + CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); + + if (lsnr != null) + lsnr.cleanupBackupQueue(msg.updateIndexes()); + } + }); + + cctx.time().schedule(new Runnable() { + @Override public void run() { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext()); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext()); + } + }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); } /** {@inheritDoc} */ @@ -141,18 +165,25 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. + * @param updateIdx Update index. + * @param topVer Topology version. * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated(GridCacheEntryEx e, KeyCacheObject key, CacheObject newVal, CacheObject oldVal, - boolean preload) + boolean primary, + boolean preload, + long updateIdx, + AffinityTopologyVersion topVer) throws IgniteCheckedException { assert e != null; assert key != null; + assert Thread.holdsLock(e) : e; boolean internal = e.isInternal() || !e.context().userCache(); @@ -179,8 +210,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean initialized = false; - boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE); - boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { if (preload && !lsnr.notifyExisting()) @@ -205,7 +235,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { evtType, key, newVal, - lsnr.oldValueRequired() ? oldVal : null); + lsnr.oldValueRequired() ? oldVal : null, + e.partition(), + updateIdx, + topVer); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -224,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { throws IgniteCheckedException { assert e != null; assert key != null; + assert Thread.holdsLock(e) : e; if (e.isInternal()) return; @@ -255,7 +289,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { EXPIRED, key, null, - lsnr.oldValueRequired() ? oldVal : null); + lsnr.oldValueRequired() ? oldVal : null, + e.partition(), + 0, + null); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -373,6 +410,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param topVer Topology version. + */ + public void beforeExchange(AffinityTopologyVersion topVer) { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.flushBackupQueue(cctx.kernalContext(), topVer); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.flushBackupQueue(cctx.kernalContext(), topVer); + } + + /** + * Partition evicted callback. + * + * @param part Partition number. + */ + public void onPartitionEvicted(int part) { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.onPartitionEvicted(part); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.onPartitionEvicted(part); + } + + /** * @param locLsnr Local listener. * @param rmtFilter Remote filter. * @param bufSize Buffer size. @@ -417,7 +478,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { sync, ignoreExpired, taskNameHash, - skipPrimaryCheck); + skipPrimaryCheck, + cctx.isLocal()); IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); @@ -469,10 +531,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { GridCacheEntryEx e = it.next(); + CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( + cctx.cacheId(), + CREATED, + e.key(), + e.rawGet(), + null, + 0, + 0, + null); + next = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), - cctx, - new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null)); + cctx, entry); if (rmtFilter != null && !rmtFilter.evaluate(next)) next = null; @@ -637,6 +708,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param impl Listener. + * @param log Logger. */ JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) { assert impl != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java new file mode 100644 index 0000000..2fef161 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java @@ -0,0 +1,7 @@ +package org.apache.ignite.internal.processors.continuous; + +/** + * Created by Nikolay on 02.09.2015. + */ +public interface GridContinuousBatch { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java new file mode 100644 index 0000000..8e29e29 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -0,0 +1,7 @@ +package org.apache.ignite.internal.processors.continuous; + +/** + * Created by Nikolay on 02.09.2015. + */ +public class GridContinuousBatchAdapter { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/384e57ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d1cb3a9..15c9dd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -651,6 +651,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId ID of the node that started routine. * @param routineId Routine ID. + * @param objs Notification objects. + * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + public void addBackupNotification(UUID nodeId, + final UUID routineId, + Collection<?> objs, + @Nullable Object orderedTopic) + throws IgniteCheckedException { + if (processorStopped) + return; + + final RemoteRoutineInfo info = rmtInfos.get(routineId); + + if (info != null) { + final GridContinuousBatch batch = info.addAll(objs); + + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null); + } + } + + /** + * @param nodeId ID of the node that started routine. + * @param routineId Routine ID. * @param obj Notification object. * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. * @param sync If {@code true} then waits for event acknowledgment. @@ -658,8 +682,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ public void addNotification(UUID nodeId, - UUID routineId, - @Nullable Object obj, + final UUID routineId, + Object obj, @Nullable Object orderedTopic, boolean sync, boolean msg) @@ -673,7 +697,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (processorStopped) return; - RemoteRoutineInfo info = rmtInfos.get(routineId); + final RemoteRoutineInfo info = rmtInfos.get(routineId); if (info != null) { assert info.interval == 0 || !sync; @@ -686,7 +710,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { syncMsgFuts.put(futId, fut); try { - sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg); + sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null); } catch (IgniteCheckedException e) { syncMsgFuts.remove(futId); @@ -697,10 +721,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { fut.get(); } else { - Collection<Object> toSnd = info.add(obj); + final GridContinuousBatch batch = info.add(obj); + + if (batch != null) { + CI1<IgniteException> ackC = new CI1<IgniteException>() { + @Override public void apply(IgniteException e) { + if (e == null) + info.hnd.onBatchAcknowledged(routineId, batch, ctx); + } + }; - if (toSnd != null) - sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg); + sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC); + } } } } @@ -725,6 +757,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. * @param msg If {@code true} then sent data is collection of messages. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ private void sendNotification(UUID nodeId, @@ -732,7 +765,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Nullable IgniteUuid futId, Collection<Object> toSnd, @Nullable Object orderedTopic, - boolean msg) throws IgniteCheckedException { + boolean msg, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert nodeId != null; assert routineId != null; assert toSnd != null; @@ -740,7 +774,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), - orderedTopic); + orderedTopic, + ackC); } /** @@ -859,6 +894,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false), + null, null); } catch (IgniteCheckedException e) { @@ -922,15 +958,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { break; } - IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval(); + IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval(); - Collection<Object> toSnd = t.get1(); + final GridContinuousBatch batch = t.get1(); - if (toSnd != null && !toSnd.isEmpty()) { + if (batch != null && batch.size() > 0) { try { + Collection<Object> toSnd = batch.collect(); + boolean msg = toSnd.iterator().next() instanceof Message; - sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg); + CI1<IgniteException> ackC = new CI1<IgniteException>() { + @Override public void apply(IgniteException e) { + if (e == null) + info.hnd.onBatchAcknowledged(routineId, batch, ctx); + } + }; + + sendNotification(nodeId, + routineId, + null, + toSnd, + hnd.orderedTopic(), + msg, + ackC); } catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) @@ -1013,9 +1064,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ - private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic) + private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert nodeId != null; assert msg != null; @@ -1023,7 +1076,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ClusterNode node = ctx.discovery().node(nodeId); if (node != null) - sendWithRetries(node, msg, orderedTopic); + sendWithRetries(node, msg, orderedTopic, ackC); else throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); } @@ -1033,14 +1086,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ - private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic) - throws IgniteCheckedException { + private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert node != null; assert msg != null; - sendWithRetries(F.asList(node), msg, orderedTopic); + sendWithRetries(F.asList(node), msg, orderedTopic, ackC); } /** @@ -1048,10 +1102,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param ackC Ack closure. * @throws IgniteCheckedException In case of error. */ private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, - @Nullable Object orderedTopic) throws IgniteCheckedException { + @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert !F.isEmpty(nodes); assert msg != null; @@ -1074,10 +1129,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { msg, SYSTEM_POOL, 0, - true); + true, + ackC); } else - ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); + ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC); break; } @@ -1178,8 +1234,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Lock. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); - /** Buffer. */ - private ConcurrentLinkedDeque8<Object> buf; + /** Batch. */ + private GridContinuousBatch batch; /** Last send time. */ private long lastSndTime = U.currentTimeMillis(); @@ -1210,7 +1266,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { this.interval = interval; this.autoUnsubscribe = autoUnsubscribe; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); } /** @@ -1238,21 +1294,53 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param objs Objects to add. + * @return Batch to send. + */ + GridContinuousBatch addAll(Collection<?> objs) { + assert objs != null; + assert objs.size() > 0; + + GridContinuousBatch toSnd = null; + + lock.writeLock().lock(); + + try { + for (Object obj : objs) + batch.add(obj); + + toSnd = batch; + + batch = hnd.createBatch(); + + if (interval > 0) + lastSndTime = U.currentTimeMillis(); + } + finally { + lock.writeLock().unlock(); + } + + return toSnd; + } + + /** * @param obj Object to add. - * @return Object to send or {@code null} if there is nothing to send for now. + * @return Batch to send or {@code null} if there is nothing to send for now. */ - @Nullable Collection<Object> add(@Nullable Object obj) { - ConcurrentLinkedDeque8 buf0 = null; + @Nullable GridContinuousBatch add(Object obj) { + assert obj != null; - if (buf.sizex() >= bufSize - 1) { + GridContinuousBatch toSnd = null; + + if (batch.size() >= bufSize - 1) { lock.writeLock().lock(); try { - buf.add(obj); + batch.add(obj); - buf0 = buf; + toSnd = batch; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); if (interval > 0) lastSndTime = U.currentTimeMillis(); @@ -1265,34 +1353,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter { lock.readLock().lock(); try { - buf.add(obj); + batch.add(obj); } finally { lock.readLock().unlock(); } } - Collection<Object> toSnd = null; - - if (buf0 != null) { - toSnd = new ArrayList<>(buf0.sizex()); - - for (Object o : buf0) - toSnd.add(o); - } - return toSnd; } /** - * @return Tuple with objects to sleep (or {@code null} if there is nothing to + * @return Tuple with batch to send (or {@code null} if there is nothing to * send for now) and time interval after next check is needed. */ @SuppressWarnings("TooBroadScope") - IgniteBiTuple<Collection<Object>, Long> checkInterval() { + IgniteBiTuple<GridContinuousBatch, Long> checkInterval() { assert interval > 0; - Collection<Object> toSnd = null; + GridContinuousBatch toSnd = null; long diff; long now = U.currentTimeMillis(); @@ -1302,10 +1381,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { diff = now - lastSndTime; - if (diff >= interval && !buf.isEmpty()) { - toSnd = buf; + if (diff >= interval && batch.size() > 0) { + toSnd = batch; - buf = new ConcurrentLinkedDeque8<>(); + batch = hnd.createBatch(); lastSndTime = now; }
