http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e7706dd..e5347c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -24,17 +24,12 @@ import java.io.ObjectOutput; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; @@ -42,23 +37,22 @@ import javax.cache.event.EventType; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; @@ -67,9 +61,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; -import org.apache.ignite.internal.util.GridConcurrentSkipListSet; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -81,7 +72,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; @@ -94,7 +84,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private static final long serialVersionUID = 0L; /** */ - private static final int BACKUP_ACK_THRESHOLD = 100; + static final int BACKUP_ACK_THRESHOLD = + IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD", 100); + + /** */ + static final int LSNR_MAX_BUF_SIZE = + IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000); /** Cache name. */ private String cacheName; @@ -109,7 +104,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private CacheEntryEventSerializableFilter<K, V> rmtFilter; /** Deployable object for filter. */ - private DeployableObject rmtFilterDep; + private CacheContinuousQueryDeployableObject rmtFilterDep; /** Internal flag. */ private boolean internal; @@ -132,9 +127,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; - /** Backup queue. */ - private transient volatile Collection<CacheContinuousQueryEntry> backupQueue; - /** */ private boolean locCache; @@ -142,13 +134,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private boolean keepBinary; /** */ - private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; + private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs; /** */ - private transient ConcurrentMap<Integer, EntryBuffer> entryBufs; + private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs; /** */ - private transient AcknowledgeBuffer ackBuf; + private transient CacheContinuousQueryAcknowledgeBuffer ackBuf; /** */ private transient int cacheId; @@ -163,6 +155,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient volatile AffinityTopologyVersion initTopVer; /** */ + private transient volatile boolean nodeLeft; + + /** */ private transient boolean ignoreClsNotFound; /** */ @@ -337,9 +332,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler entryBufs = new ConcurrentHashMap<>(); - backupQueue = new ConcurrentLinkedDeque8<>(); - - ackBuf = new AcknowledgeBuffer(); + ackBuf = new CacheContinuousQueryAcknowledgeBuffer(); rcvs = new ConcurrentHashMap<>(); @@ -409,7 +402,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler ctx.asyncCallbackPool().execute(clsr, evt.partitionId()); } else { - final boolean notify = filter(evt, primary); + final boolean notify = filter(evt); if (log.isDebugEnabled()) log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary @@ -429,6 +422,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler }, sync); } } + else + handleBackupEntry(cctx, evt.entry()); } } @@ -438,50 +433,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) { - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) { - Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); - - while (it.hasNext()) { - CacheContinuousQueryEntry backupEntry = it.next(); + for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) { + CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey()); - Long updateCntr = updateCntrs.get(backupEntry.partition()); - - if (updateCntr != null && backupEntry.updateCounter() <= updateCntr) - it.remove(); - } + if (buf != null) + buf.cleanupBackupQueue(e.getValue()); } } @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) { assert topVer != null; - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); - if (backupQueue0 == null) - return; + ClusterNode node = ctx.discovery().node(nodeId); - try { - ClusterNode nodeId0 = ctx.discovery().node(nodeId); + for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { + CacheContinuousQueryEventBuffer buf = bufE.getValue(); - if (nodeId0 != null) { - GridCacheContext<K, V> cctx = cacheContext(ctx); + Collection<CacheContinuousQueryEntry> backupQueue = buf.flushOnExchange(); - for (CacheContinuousQueryEntry e : backupQueue0) { - if (!e.isFiltered()) - prepareEntry(cctx, nodeId, e); + if (backupQueue != null && node != null) { + for (CacheContinuousQueryEntry e : backupQueue) { + e.markBackup(); - e.topologyVersion(topVer); - } + if (!e.isFiltered()) + prepareEntry(cctx, nodeId, e); + } - ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0, topic); + ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); + } } - else - // Node which start CQ leave topology. Not needed to put data to backup queue. - backupQueue = null; - - backupQueue0.clear(); } catch (IgniteCheckedException e) { U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), @@ -505,14 +488,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } @Override public void onPartitionEvicted(int part) { - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) { - for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); it.hasNext(); ) { - if (it.next().partition() == part) - it.remove(); - } - } + entryBufs.remove(part); } @Override public boolean oldValueRequired() { @@ -739,17 +715,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } - PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion()); + CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion()); return rec.collectEntries(e, cctx, cache); } /** - * @param primary Primary. * @param evt Query event. * @return {@code True} if event passed filter otherwise {@code true}. */ - public boolean filter(CacheContinuousQueryEvent evt, boolean primary) { + public boolean filter(CacheContinuousQueryEvent evt) { CacheContinuousQueryEntry entry = evt.entry(); boolean notify = !entry.isFiltered(); @@ -765,15 +740,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!notify) entry.markFiltered(); - if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) { - entry.markBackup(); - - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) - backupQueue0.add(entry.forBackupQueue()); - } - return notify; } @@ -811,13 +777,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!entry.isFiltered()) prepareEntry(cctx, nodeId, entry); - CacheContinuousQueryEntry e = handleEntry(entry); + Object entryOrList = handleEntry(cctx, entry); - if (e != null) { + if (entryOrList != null) { if (log.isDebugEnabled()) - log.debug("Send the following event to listener: " + e); + log.debug("Send the following event to listener: " + entryOrList); - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + ctx.continuous().addNotification(nodeId, routineId, entryOrList, topic, sync, true); } } } @@ -865,7 +831,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (internal) return; - for (PartitionRecovery rec : rcvs.values()) + for (CacheContinuousQueryPartitionRecovery rec : rcvs.values()) rec.resetTopologyCache(); } @@ -875,12 +841,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param topVer Topology version for current operation. * @return Partition recovery. */ - @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, + @NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId, AffinityTopologyVersion topVer) { assert topVer != null && topVer.topologyVersion() > 0 : topVer; - PartitionRecovery rec = rcvs.get(partId); + CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId); if (rec == null) { T2<Long, Long> partCntrs = null; @@ -905,10 +871,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler else if (initUpdCntrs != null) partCntrs = initUpdCntrs.get(partId); - rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, + rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, partCntrs != null ? partCntrs.get2() : null); - PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); + CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); if (oldRec != null) rec = oldRec; @@ -918,10 +884,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * @param cctx Cache context. + * @param e Entry. + */ + private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { + if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries. + return; + + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition()); + + buf.processEntry(e.copyWithDataReset(), true); + } + + /** + * @param cctx Cache context. * @param e Entry. * @return Entry. */ - private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) { + private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { assert e != null; assert entryBufs != null; @@ -934,354 +914,52 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler // Initial query entry. // This events should be fired immediately. - if (e.updateCounter() == -1) + if (e.updateCounter() == -1L) return e; - EntryBuffer buf = entryBufs.get(e.partition()); + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition()); - if (buf == null) { - buf = new EntryBuffer(); - - EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); - - if (oldRec != null) - buf = oldRec; - } - - return buf.handle(e); + return buf.processEntry(e, false); } /** - * + * @param cctx Cache context. + * @param part Partition. + * @return Event buffer. */ - private static class PartitionRecovery { - /** Event which means hole in sequence. */ - private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); - - /** */ - private final static int MAX_BUFF_SIZE = 100; - - /** */ - private IgniteLogger log; - - /** */ - private long lastFiredEvt; - - /** */ - private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; - - /** */ - private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); - - /** - * @param log Logger. - * @param topVer Topology version. - * @param initCntr Update counters. - */ - PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { - this.log = log; - - if (initCntr != null) { - assert topVer.topologyVersion() > 0 : topVer; - - this.lastFiredEvt = initCntr; - - curTop = topVer; - } - } - - /** - * Resets cached topology. - */ - void resetTopologyCache() { - curTop = AffinityTopologyVersion.NONE; - } - - /** - * Add continuous entry. - * - * @param cctx Cache context. - * @param cache Cache. - * @param entry Cache continuous query entry. - * @return Collection entries which will be fired. This collection should contains only non-filtered events. - */ - <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( - CacheContinuousQueryEntry entry, - GridCacheContext cctx, - IgniteCache cache - ) { - assert entry != null; - - if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. - assert entry.updateCounter() == 0L : entry; - - return F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); - } - - List<CacheEntryEvent<? extends K, ? extends V>> entries; - - synchronized (pendingEvts) { - if (log.isDebugEnabled()) { - log.debug("Handling event [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - // Received first event. - if (curTop == AffinityTopologyVersion.NONE) { - lastFiredEvt = entry.updateCounter(); - - curTop = entry.topologyVersion(); - - if (log.isDebugEnabled()) { - log.debug("First event [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + ']'); - } - - return !entry.isFiltered() ? - F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : - Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); - } - - if (curTop.compareTo(entry.topologyVersion()) < 0) { - if (entry.updateCounter() == 1L && !entry.isBackup()) { - entries = new ArrayList<>(pendingEvts.size()); - - for (CacheContinuousQueryEntry evt : pendingEvts.values()) { - if (evt != HOLE && !evt.isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); - } - - pendingEvts.clear(); - - curTop = entry.topologyVersion(); - - lastFiredEvt = entry.updateCounter(); - - if (!entry.isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); + private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) { + CacheContinuousQueryEventBuffer buf = entryBufs.get(part); - if (log.isDebugEnabled()) - log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - - return entries; - } - - curTop = entry.topologyVersion(); - } - - // Check duplicate. - if (entry.updateCounter() > lastFiredEvt) { - pendingEvts.put(entry.updateCounter(), entry); - - // Put filtered events. - if (entry.filteredEvents() != null) { - for (long cnrt : entry.filteredEvents()) { - if (cnrt > lastFiredEvt) - pendingEvts.put(cnrt, HOLE); - } - } - } - else { - if (log.isDebugEnabled()) - log.debug("Skip duplicate continuous query message: " + entry); - - return Collections.emptyList(); - } - - if (pendingEvts.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + ']'); - } - - return Collections.emptyList(); - } - - Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); - - entries = new ArrayList<>(); - - if (pendingEvts.size() >= MAX_BUFF_SIZE) { - for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { - Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - - if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); - - lastFiredEvt = e.getKey(); - - iter.remove(); - } - - if (log.isDebugEnabled()) { - log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - } - else { - // Elements are consistently. - while (iter.hasNext()) { - Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - - if (e.getKey() == lastFiredEvt + 1) { - ++lastFiredEvt; + if (buf == null) { + buf = new CacheContinuousQueryEventBuffer(part) { + @Override protected long currentPartitionCounter() { + GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false); - if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); + if (locPart == null) + return -1L; - iter.remove(); - } - else - break; - } + return locPart.updateCounter(); } - } - - if (log.isDebugEnabled()) { - log.debug("Will send to listener the following events [entries=" + entries + - ", lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - return entries; - } - } - - /** - * - */ - private static class EntryBuffer { - /** */ - private final static int MAX_BUFF_SIZE = 100; + }; - /** */ - private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>(); - - /** */ - private AtomicLong lastFiredCntr = new AtomicLong(); - - /** - * @param newVal New value. - * @return Old value if previous value less than new value otherwise {@code -1}. - */ - private long updateFiredCounter(long newVal) { - long prevVal = lastFiredCntr.get(); - - while (prevVal < newVal) { - if (lastFiredCntr.compareAndSet(prevVal, newVal)) - return prevVal; - else - prevVal = lastFiredCntr.get(); - } + CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(part, buf); - return prevVal >= newVal ? -1 : prevVal; + if (oldBuf != null) + buf = oldBuf; } - /** - * Add continuous entry. - * - * @param e Cache continuous query entry. - * @return Collection entries which will be fired. - */ - public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) { - assert e != null; - - if (e.isFiltered()) { - Long last = buf.lastx(); - Long first = buf.firstx(); - - if (last != null && first != null && last - first >= MAX_BUFF_SIZE) { - NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true); - - GridLongList filteredEvts = new GridLongList((int)(last - first)); - - int size = 0; - - Long cntr; - - while ((cntr = prevHoles.pollFirst()) != null) { - filteredEvts.add(cntr); - - ++size; - } - - filteredEvts.truncate(size, true); - - e.filteredEvents(filteredEvts); - - return e; - } - - if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) - return e; - else { - buf.add(e.updateCounter()); - - // Double check. If another thread sent a event with counter higher than this event. - if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { - buf.remove(e.updateCounter()); - - return e; - } - else - return null; - } - } - else { - long prevVal = updateFiredCounter(e.updateCounter()); - - if (prevVal == -1) - return e; - else { - NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true); - - GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal)); - - int size = 0; - - Long cntr; - - while ((cntr = prevHoles.pollFirst()) != null) { - filteredEvts.add(cntr); - - ++size; - } - - filteredEvts.truncate(size, true); - - e.filteredEvents(filteredEvts); - - return e; - } - } - } + return buf; } /** {@inheritDoc} */ @Override public void onNodeLeft() { - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; + nodeLeft = true; - if (backupQueue0 != null) - backupQueue = null; + for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { + CacheContinuousQueryEventBuffer buf = bufE.getValue(); + + buf.flushOnExchange(); + } } /** {@inheritDoc} */ @@ -1290,7 +968,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert ctx.config().isPeerClassLoadingEnabled(); if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) - rmtFilterDep = new DeployableObject(rmtFilter, ctx); + rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx); } /** {@inheritDoc} */ @@ -1411,7 +1089,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean b = in.readBoolean(); if (b) - rmtFilterDep = (DeployableObject)in.readObject(); + rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject(); else rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject(); @@ -1436,95 +1114,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return ctx.cache().<K, V>context().cacheContext(cacheId); } - /** */ - private static class AcknowledgeBuffer { - /** */ - private int size; - - /** */ - @GridToStringInclude - private Map<Integer, Long> updateCntrs = new HashMap<>(); - - /** */ - @GridToStringInclude - private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); - - /** - * @param batch Batch. - * @return Non-null tuple if acknowledge should be sent to backups. - */ - @SuppressWarnings("unchecked") - @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> - onAcknowledged(GridContinuousBatch batch) { - assert batch instanceof GridContinuousQueryBatch; - - size += ((GridContinuousQueryBatch)batch).entriesCount(); - - Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); - - for (CacheContinuousQueryEntry e : entries) - addEntry(e); - - return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; - } - - /** - * @param e Entry. - * @return Non-null tuple if acknowledge should be sent to backups. - */ - @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> - onAcknowledged(CacheContinuousQueryEntry e) { - size++; - - addEntry(e); - - return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; - } - - /** - * @param e Entry. - */ - private void addEntry(CacheContinuousQueryEntry e) { - topVers.add(e.topologyVersion()); - - Long cntr0 = updateCntrs.get(e.partition()); - - if (cntr0 == null || e.updateCounter() > cntr0) - updateCntrs.put(e.partition(), e.updateCounter()); - } - - /** - * @return Non-null tuple if acknowledge should be sent to backups. - */ - @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> - acknowledgeOnTimeout() { - return size > 0 ? acknowledgeData() : null; - } - - /** - * @return Tuple with acknowledge information. - */ - private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { - assert size > 0; - - Map<Integer, Long> cntrs = new HashMap<>(updateCntrs); - - IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = - new IgniteBiTuple<>(cntrs, topVers); - - topVers = U.newHashSet(1); - - size = 0; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(AcknowledgeBuffer.class, this); - } - } - /** * */ @@ -1560,44 +1149,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @Override public void run() { - final boolean notify = filter(evt, primary); - - if (!primary()) - return; + final boolean notify = filter(evt); - if (fut == null) { - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + if (primary || skipPrimaryCheck) { + if (fut == null) { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - return; - } + return; + } - if (fut.isDone()) { - if (fut.error() != null) - evt.entry().markFiltered(); + if (fut.isDone()) { + if (fut.error() != null) + evt.entry().markFiltered(); - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - } - else { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - if (f.error() != null) - evt.entry().markFiltered(); - - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - } - }, evt.entry().partition()); - } - }); + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + else { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + if (f.error() != null) + evt.entry().markFiltered(); + + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + }, evt.entry().partition()); + } + }); + } } - } - - /** - * @return {@code True} if event fired on this node. - */ - private boolean primary() { - return primary || skipPrimaryCheck; + else + handleBackupEntry(cacheContext(ctx), evt.entry()); } /** {@inheritDoc} */ @@ -1606,82 +1189,4 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - /** - * Deployable object. - */ - protected static class DeployableObject implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Serialized object. */ - private byte[] bytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** - * Required by {@link Externalizable}. - */ - public DeployableObject() { - // No-op. - } - - /** - * @param obj Object. - * @param ctx Kernal context. - * @throws IgniteCheckedException In case of error. - */ - protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { - assert obj != null; - assert ctx != null; - - Class cls = U.detectClass(obj); - - clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); - - depInfo = new GridDeploymentInfoBean(dep); - - bytes = U.marshal(ctx, obj); - } - - /** - * @param nodeId Node ID. - * @param ctx Kernal context. - * @return Deserialized object. - * @throws IgniteCheckedException In case of error. - */ - <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { - assert ctx != null; - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); - - return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, bytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - bytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index 7aef4dd..e48d22e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -44,7 +44,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan private Factory<? extends CacheEntryEventFilter> rmtFilterFactory; /** Deployable object for filter factory. */ - private DeployableObject rmtFilterFactoryDep; + private CacheContinuousQueryDeployableObject rmtFilterFactoryDep; /** Event types for JCache API. */ private byte types; @@ -122,7 +122,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan super.p2pMarshal(ctx); if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass())) - rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx); + rmtFilterFactoryDep = new CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx); } /** {@inheritDoc} */ @@ -167,7 +167,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan boolean b = in.readBoolean(); if (b) - rmtFilterFactoryDep = (DeployableObject)in.readObject(); + rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject(); else rmtFilterFactory = (Factory)in.readObject(); http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index acf351f..1a655e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -193,7 +193,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.keepBinary(), partId, updCntr, - topVer); + topVer, + (byte)0); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -339,7 +340,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.keepBinary(), partId, updateCntr, - topVer); + topVer, + (byte)0); IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name()); @@ -400,7 +402,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.keepBinary(), e.partition(), -1, - null); + null, + (byte)0); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -568,9 +571,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** - * @param topVer Topology version. + * @param topVer Finished exchange topology version. */ - public void beforeExchange(AffinityTopologyVersion topVer) { + public void flushBackupQueue(AffinityTopologyVersion topVer) { for (CacheContinuousQueryListener lsnr : lsnrs.values()) lsnr.flushBackupQueue(cctx.kernalContext(), topVer); } @@ -703,7 +706,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { keepBinary, 0, -1, - null); + null, + (byte)0); next = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java new file mode 100644 index 0000000..e210c24 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import javax.cache.event.CacheEntryEvent; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class CacheContinuousQueryPartitionRecovery { + /** Event which means hole in sequence. */ + private static final CacheContinuousQueryEntry HOLE; + + static { + HOLE = new CacheContinuousQueryEntry(); + + HOLE.markFiltered(); + } + + /** */ + private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE; + + /** */ + private IgniteLogger log; + + /** */ + private long lastFiredEvt; + + /** */ + private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; + + /** */ + private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); + + /** + * @param log Logger. + * @param topVer Topology version. + * @param initCntr Update counters. + */ + CacheContinuousQueryPartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { + this.log = log; + + if (initCntr != null) { + assert topVer.topologyVersion() > 0 : topVer; + + this.lastFiredEvt = initCntr; + + curTop = topVer; + } + } + + /** + * Resets cached topology. + */ + void resetTopologyCache() { + curTop = AffinityTopologyVersion.NONE; + } + + /** + * Add continuous entry. + * + * @param cctx Cache context. + * @param cache Cache. + * @param entry Cache continuous query entry. + * @return Collection entries which will be fired. This collection should contains only non-filtered events. + */ + <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( + CacheContinuousQueryEntry entry, + GridCacheContext cctx, + IgniteCache cache + ) { + assert entry != null; + + if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. + assert entry.updateCounter() == 0L : entry; + + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); + } + + List<CacheEntryEvent<? extends K, ? extends V>> entries; + + synchronized (pendingEvts) { + if (log.isDebugEnabled()) { + log.debug("Handling event [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + // Received first event. + if (curTop == AffinityTopologyVersion.NONE) { + lastFiredEvt = entry.updateCounter(); + + curTop = entry.topologyVersion(); + + if (log.isDebugEnabled()) { + log.debug("First event [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + ']'); + } + + return !entry.isFiltered() ? + F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : + Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); + } + + if (curTop.compareTo(entry.topologyVersion()) < 0) { + if (entry.updateCounter() == 1L && !entry.isBackup()) { + entries = new ArrayList<>(pendingEvts.size()); + + for (CacheContinuousQueryEntry evt : pendingEvts.values()) { + if (evt != HOLE && !evt.isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); + } + + pendingEvts.clear(); + + curTop = entry.topologyVersion(); + + lastFiredEvt = entry.updateCounter(); + + if (!entry.isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); + + if (log.isDebugEnabled()) + log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + + return entries; + } + + curTop = entry.topologyVersion(); + } + + // Check duplicate. + if (entry.updateCounter() > lastFiredEvt) + pendingEvts.put(entry.updateCounter(), entry); + else { + if (log.isDebugEnabled()) + log.debug("Skip duplicate continuous query message: " + entry); + + return Collections.emptyList(); + } + + if (pendingEvts.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + ']'); + } + + return Collections.emptyList(); + } + + Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); + + entries = new ArrayList<>(); + + if (pendingEvts.size() >= MAX_BUFF_SIZE) { + if (log.isDebugEnabled()) { + log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() + + ", bufSize=" + MAX_BUFF_SIZE + + ", partId=" + entry.partition() + ']'); + + for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); + + lastFiredEvt = e.getKey(); + + iter.remove(); + } + } + else { + boolean skippedFiltered = false; + + while (iter.hasNext()) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + CacheContinuousQueryEntry pending = e.getValue(); + + long filtered = pending.filteredCount(); + + boolean fire = e.getKey() == lastFiredEvt + 1;; + + if (!fire && filtered > 0) + fire = e.getKey() - filtered <= lastFiredEvt + 1; + + if (fire) { + lastFiredEvt = e.getKey(); + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, pending)); + + iter.remove(); + } + else { + if (pending.isFiltered()) + skippedFiltered = true; + else + break; + } + } + + if (skippedFiltered) + pendingEvts.headMap(lastFiredEvt).clear(); + } + } + + if (log.isDebugEnabled()) { + log.debug("Will send to listener the following events [entries=" + entries + + ", lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + return entries; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java index 4540de1..597eae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -25,7 +25,7 @@ import org.jsr166.ConcurrentLinkedDeque8; */ public class GridContinuousBatchAdapter implements GridContinuousBatch { /** Buffer. */ - private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); + protected final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); /** {@inheritDoc} */ @Override public void add(Object obj) { http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index abcd8ea..a72dcd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -75,7 +75,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; @@ -872,10 +871,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null); } else { - LocalRoutineInfo localRoutineInfo = locInfos.get(routineId); + LocalRoutineInfo locRoutineInfo = locInfos.get(routineId); - if (localRoutineInfo != null) - localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx); + if (locRoutineInfo != null) + locRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx); } } @@ -897,7 +896,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { throws IgniteCheckedException { assert nodeId != null; assert routineId != null; - assert !msg || obj instanceof Message : obj; + assert !msg || (obj instanceof Message || obj instanceof Collection) : obj; assert !nodeId.equals(ctx.localNodeId()); @@ -917,7 +916,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { syncMsgFuts.put(futId, fut); try { - sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null); + sendNotification(nodeId, + routineId, + futId, + obj instanceof Collection ? (Collection)obj : F.asList(obj), + null, + msg, + null); info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx); } @@ -1563,7 +1568,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridContinuousBatch addAll(Collection<?> objs) { assert objs != null; - GridContinuousBatch toSnd = null; + GridContinuousBatch toSnd; lock.writeLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java index c5d854b..0eba44b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.continuous; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; @@ -31,11 +32,20 @@ public class GridContinuousQueryBatch extends GridContinuousBatchAdapter { /** {@inheritDoc} */ @Override public void add(Object obj) { assert obj != null; - assert obj instanceof CacheContinuousQueryEntry; + assert obj instanceof CacheContinuousQueryEntry || obj instanceof List; - super.add(obj); + if (obj instanceof CacheContinuousQueryEntry) { + buf.add(obj); - size.addAndGet(((CacheContinuousQueryEntry)obj).size()); + size.incrementAndGet(); + } + else { + List<Object> objs = (List<Object>)obj; + + buf.addAll(objs); + + size.addAndGet(objs.size()); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java index 3cab9e0..d505d19 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java @@ -25,7 +25,6 @@ import org.apache.ignite.cache.CacheMode; */ public class CacheContinuousQueryAsyncFailoverAtomicSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { - /** {@inheritDoc} */ @Override protected CacheMode cacheMode() { return CacheMode.PARTITIONED; http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java new file mode 100644 index 0000000..9c7c836 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatePartitionAtomic() throws Exception { + concurrentUpdatePartition(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatePartitionTx() throws Exception { + concurrentUpdatePartition(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) throws Exception { + Ignite srv = startGrid(0); + + client = true; + + Ignite client = startGrid(1); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + + IgniteCache clientCache = client.createCache(ccfg); + + final AtomicInteger evtCnt = new AtomicInteger(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent evt : evts) { + assertNotNull(evt.getKey()); + assertNotNull(evt.getValue()); + + evtCnt.incrementAndGet(); + } + } + }); + + clientCache.query(qry); + + Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME); + + final List<Integer> keys = new ArrayList<>(); + + final int KEYS = 10; + + for (int i = 0; i < 100_000; i++) { + if (aff.partition(i) == 0) { + keys.add(i); + + if (keys.size() == KEYS) + break; + } + } + + assertEquals(KEYS, keys.size()); + + final int THREADS = 10; + final int UPDATES = 1000; + + final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 15; i++) { + log.info("Iteration: " + i); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < UPDATES; i++) + srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + + return null; + } + }, THREADS, "update"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + log.info("Events: " + evtCnt.get()); + + return evtCnt.get() >= THREADS * UPDATES; + } + }, 5000); + + assertEquals(THREADS * UPDATES, evtCnt.get()); + + evtCnt.set(0); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception { + concurrentUpdatesAndQueryStart(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatesAndQueryStartTx() throws Exception { + concurrentUpdatesAndQueryStart(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode) throws Exception { + Ignite srv = startGrid(0); + + client = true; + + Ignite client = startGrid(1); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + + IgniteCache clientCache = client.createCache(ccfg); + + Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME); + + final List<Integer> keys = new ArrayList<>(); + + final int KEYS = 10; + + for (int i = 0; i < 100_000; i++) { + if (aff.partition(i) == 0) { + keys.add(i); + + if (keys.size() == KEYS) + break; + } + } + + assertEquals(KEYS, keys.size()); + + final int THREADS = 10; + final int UPDATES = 1000; + + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final AtomicInteger evtCnt = new AtomicInteger(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent evt : evts) { + assertNotNull(evt.getKey()); + assertNotNull(evt.getValue()); + + if ((Integer)evt.getValue() >= 0) + evtCnt.incrementAndGet(); + } + } + }); + + QueryCursor cur; + + final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) + srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200); + + return null; + } + }, THREADS, "update"); + + U.sleep(1000); + + cur = clientCache.query(qry); + + U.sleep(1000); + + stop.set(true); + + fut.get(); + } + finally { + stop.set(true); + } + + GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < UPDATES; i++) + srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + + return null; + } + }, THREADS, "update"); + + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + log.info("Events: " + evtCnt.get()); + + return evtCnt.get() >= THREADS * UPDATES; + } + }, 5000); + + assertEquals(THREADS * UPDATES, evtCnt.get()); + + cur.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java new file mode 100644 index 0000000..382f166 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CyclicBarrier; +import javax.cache.event.EventType; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +@SuppressWarnings("unchecked") +public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testBuffer1() throws Exception { + testBuffer(1); + } + + /** + * @throws Exception If failed. + */ + public void testBuffer2() throws Exception { + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + testBuffer(10); + } + } + + /** + * @param threads Threads number. + * @throws Exception If failed. + */ + private void testBuffer(int threads) throws Exception { + long seed = System.nanoTime(); + + Random rnd = new Random(seed); + + log.info("Start test, seed: " + seed); + + for (int i = 0; i < 10; i++) { + int cnt = rnd.nextInt(10_000) + 1; + + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.5f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.9f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.99f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.01f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.f, threads); + } + + CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(0); + + long cntr = 1; + + for (int i = 0; i < 10; i++) { + int cnt = rnd.nextInt(10_000) + 1; + float ratio = rnd.nextFloat(); + + testBuffer(rnd, b, cnt, cntr, ratio, threads); + + cntr += cnt; + } + } + + /** + * @param rnd Random. + * @param b Buffer. + * @param cnt Entries count. + * @param cntr Current counter. + * @param filterRatio Filtered events ratio. + * @param threads Threads number. + * @throws Exception If failed. + */ + private void testBuffer(Random rnd, + final CacheContinuousQueryEventBuffer b, + int cnt, + long cntr, + float filterRatio, + int threads) + throws Exception + { + List<CacheContinuousQueryEntry> expEntries = new ArrayList<>(); + + List<CacheContinuousQueryEntry> entries = new ArrayList<>(); + + long filtered = b.currentFiltered(); + + for (int i = 0; i < cnt; i++) { + CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( + 0, + EventType.CREATED, + null, + null, + null, + false, + 0, + cntr, + null, + (byte)0); + + entries.add(entry); + + if (rnd.nextFloat() < filterRatio) { + entry.markFiltered(); + + filtered++; + } + else { + CacheContinuousQueryEntry expEntry = new CacheContinuousQueryEntry( + 0, + EventType.CREATED, + null, + null, + null, + false, + 0, + cntr, + null, + (byte)0); + + expEntry.filteredCount(filtered); + + expEntries.add(expEntry); + + filtered = 0; + } + + cntr++; + } + + Collections.shuffle(entries, rnd); + + List<CacheContinuousQueryEntry> actualEntries = new ArrayList<>(expEntries.size()); + + if (threads == 1) { + for (int i = 0; i < entries.size(); i++) { + Object o = entries.get(i); + + Object res = b.processEntry((CacheContinuousQueryEntry)o, false); + + if (res != null) { + if (res instanceof CacheContinuousQueryEntry) + actualEntries.add((CacheContinuousQueryEntry)res); + else + actualEntries.addAll((List<CacheContinuousQueryEntry>)res); + } + } + } + else { + final CyclicBarrier barrier = new CyclicBarrier(threads); + + final ConcurrentLinkedQueue<CacheContinuousQueryEntry> q = new ConcurrentLinkedQueue<>(entries); + + final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 = new ConcurrentSkipListMap<>(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + barrier.await(); + + Object o; + + while ((o = q.poll()) != null) { + Object res = b.processEntry((CacheContinuousQueryEntry)o, false); + + if (res != null) { + if (res instanceof CacheContinuousQueryEntry) + act0.put(((CacheContinuousQueryEntry)res).updateCounter(), (CacheContinuousQueryEntry)res); + else { + for (CacheContinuousQueryEntry e : ((List<CacheContinuousQueryEntry>)res)) + act0.put(e.updateCounter(), e); + } + } + } + + return null; + } + }, threads, "test"); + + actualEntries.addAll(act0.values()); + } + + assertEquals(expEntries.size(), actualEntries.size()); + + for (int i = 0; i < expEntries.size(); i++) { + CacheContinuousQueryEntry expEvt = expEntries.get(i); + CacheContinuousQueryEntry actualEvt = actualEntries.get(i); + + assertEquals(expEvt.updateCounter(), actualEvt.updateCounter()); + assertEquals(expEvt.filteredCount(), actualEvt.filteredCount()); + } + } +}
