IGNITE-426 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2133fa5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2133fa5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2133fa5 Branch: refs/heads/ignite-426-2-reb Commit: e2133fa50c84e4ca1f9a81d9689ad45bf0123087 Parents: 0408ed8 Author: nikolay_tikhonov <[email protected]> Authored: Tue Sep 29 16:12:46 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 15:14:16 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 8 +- .../continuous/CacheContinuousQueryEntry.java | 26 ++- .../CacheContinuousQueryFilteredEntry.java | 228 ------------------- .../continuous/CacheContinuousQueryHandler.java | 226 +++++++++--------- .../CacheContinuousQueryListener.java | 9 +- .../CacheContinuousQueryLostPartition.java | 72 +++--- .../continuous/CacheContinuousQueryManager.java | 15 +- ...acheContinuousQueryFailoverAbstractTest.java | 87 ++++++- 8 files changed, 282 insertions(+), 389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6eb9e17..3474f84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -92,7 +92,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilteredEntry; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -693,16 +692,11 @@ public class GridIoMessageFactory implements MessageFactory { break; case 115: - msg = new CacheContinuousQueryFilteredEntry(); - - break; - - case 116: msg = new CacheContinuousQueryLostPartition(); break; - // [-3..112] - this + // [-3..115] - this // [120..123] - DR // [-4..-22] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 470aa09..9e73142 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -83,6 +83,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private long updateIdx; /** */ + private boolean filtered; + + /** */ @GridToStringInclude @GridDirectTransient private AffinityTopologyVersion topVer; @@ -152,6 +155,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * Mark this event as filtered. + */ + void markFiltered() { + filtered = true; + } + + /** * @return Update index. */ long updateIndex() { @@ -162,7 +172,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @return Filtered entry. */ boolean filtered() { - return false; + return filtered; } /** @@ -286,6 +296,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); + case 7: + if (!writer.writeBoolean("filtered", filtered)) + return false; + + writer.incrementState(); + } return true; @@ -359,6 +375,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); + case 7: + filtered = reader.readBoolean("filtered"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(CacheContinuousQueryEntry.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java deleted file mode 100644 index 14d8f51..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import java.nio.ByteBuffer; -import javax.cache.event.EventType; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.Nullable; - -/** - * Continuous query entry. - */ -public class CacheContinuousQueryFilteredEntry extends CacheContinuousQueryEntry { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private EventType evtType; - - /** Cache name. */ - private int cacheId; - - /** Partition. */ - private int part; - - /** Update index. */ - private long updateIdx; - - /** */ - @GridToStringInclude - @GridDirectTransient - private AffinityTopologyVersion topVer; - - /** - * Required by {@link Message}. - */ - public CacheContinuousQueryFilteredEntry() { - // No-op. - } - - /** - * @param e Cache continuous query entry. - */ - CacheContinuousQueryFilteredEntry(CacheContinuousQueryEntry e) { - this.cacheId = e.cacheId(); - this.evtType = e.eventType(); - this.part = e.partition(); - this.updateIdx = e.updateIndex(); - this.topVer = e.topologyVersion(); - } - - /** - * @return Topology version if applicable. - */ - @Nullable AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * @return Cache ID. - */ - int cacheId() { - return cacheId; - } - - /** - * @return Event type. - */ - EventType eventType() { - return evtType; - } - - /** - * @return Partition. - */ - int partition() { - return part; - } - - /** - * @return Update index. - */ - long updateIndex() { - return updateIdx; - } - - /** {@inheritDoc} */ - @Override boolean filtered() { - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 115; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeInt("cacheId", cacheId)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeInt("part", part)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeLong("updateIdx", updateIdx)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - cacheId = reader.readInt("cacheId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - byte evtTypeOrd; - - evtTypeOrd = reader.readByte("evtType"); - - if (!reader.isLastRead()) - return false; - - evtType = CacheContinuousQueryEntry.eventTypeFromOrdinal(evtTypeOrd); - - reader.incrementState(); - - case 2: - part = reader.readInt("part"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 3: - updateIdx = reader.readLong("updateIdx"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } - - return reader.afterMessageRead(CacheContinuousQueryFilteredEntry.class); - } - - /** {@inheritDoc} */ - @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 4; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheContinuousQueryFilteredEntry.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 750cded..e8c67ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; @@ -83,6 +84,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** */ private static final int BACKUP_ACK_THRESHOLD = 100; + /** */ + private static final int QUERY_HOLE_THRESHOLD = 5; + /** Cache name. */ private String cacheName; @@ -123,15 +127,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private transient Collection<CacheContinuousQueryEntry> backupQueue; /** */ - private transient Map<Integer, Long> rcvCntrs; + private boolean localCache; /** */ private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; /** */ - private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter; - - /** */ private transient AcknowledgeBuffer ackBuf; /** */ @@ -187,16 +188,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; + this.localCache = locCache; - if (locCache) - dupEvtFilter = F.alwaysTrue(); - else { - rcvCntrs = new ConcurrentHashMap<>(); - - rcvs = new ConcurrentHashMap<>(); - - dupEvtFilter = new DuplicateEventFilter(); - } + rcvs = new ConcurrentHashMap<>(); cacheId = CU.cacheId(cacheName); } @@ -268,7 +262,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) return; - GridCacheContext<K, V> cctx = cacheContext(ctx); + final GridCacheContext<K, V> cctx = cacheContext(ctx); // Check that cache stopped. if (cctx == null) @@ -289,12 +283,53 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } try { - final CacheContinuousQueryEntry entry = notify ? evt.entry() : - new CacheContinuousQueryFilteredEntry(evt.entry()); + final CacheContinuousQueryEntry entry = evt.entry(); + + if (!notify) + entry.markFiltered(); if (primary || skipPrimaryCheck) { if (loc) { - if (dupEvtFilter.apply(entry)) { + if (!localCache) { + PartitionRecovery rcv = rcvs.get(entry.partition()); + + if (rcv == null) { + rcv = new PartitionRecovery(ctx.log(getClass())); + + PartitionRecovery oldRec = rcvs.putIfAbsent(entry.partition(), rcv); + + if (oldRec != null) + rcv = oldRec; + } + + rcv.add(entry); + + Collection<CacheContinuousQueryEntry> entries = rcv.entries(); + + if (!entries.isEmpty()) { + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, + new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { + @Override public CacheEntryEvent<? extends K, ? extends V> apply( + CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.filtered(); + } + } + ); + + locLsnr.onUpdated(evts); + + if (!skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); if (!skipPrimaryCheck) @@ -343,7 +378,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - @Override public void partitionLost(String cacheName0, int partId) { + @Override public void partitionLost(int partId) { + assert rcvs != null; + + PartitionRecovery rcv = rcvs.get(partId); + + if (rcv != null) + rcv.reset(); + } + + @Override public void firePartitionLostEvent(String cacheName0, final int partId) { GridCacheContext<K, V> cctx = cacheContext(ctx); // Check that cache stopped. @@ -352,25 +396,33 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if ((cacheName == null && cacheName0 == null) || // Check default cache. (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) { + ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + GridCacheContext<K, V> cctx = cacheContext(ctx); - final CacheContinuousQueryEntry entry = - new CacheContinuousQueryLostPartition(cctx.cacheId(), partId); + CacheContinuousQueryLostPartition msg = new CacheContinuousQueryLostPartition( + routineId, + cctx.cacheId(), + partId); - try { - prepareEntry(cctx, nodeId, entry); + try { + cctx.io().send(nodeId, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = ctx.log(getClass()); - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); - } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); + if (log.isDebugEnabled()) + log.debug("Failed to send lost partition message, node left " + + "[msg=" + msg + ", nodeId=" + routineId + ']'); + } + catch (IgniteCheckedException e) { + IgniteLogger log = ctx.log(getClass()); - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } + U.error(log, "Failed to send lost partition message " + + "[msg=" + msg + ", nodeId=" + routineId + ']', e); + } + } + }); } } @@ -537,14 +589,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { rec = oldRec; } - if (e instanceof CacheContinuousQueryLostPartition) - rec.reset(); - else { - rec.add(e); + rec.add(e); - if (!parts.containsKey(e.partition())) - parts.put(e.partition(), rec); - } + if (!parts.containsKey(e.partition())) + parts.put(e.partition(), rec); } Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); @@ -569,29 +617,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** - * @param e Entry. - * @return {@code True} if listener should be notified. - */ - private boolean notifyListener(CacheContinuousQueryEntry e) { - Integer part = e.partition(); - - Long cntr = rcvCntrs.get(part); - - if (cntr != null) { - long cntr0 = cntr; - - if (e.updateIndex() > cntr0) - rcvCntrs.put(part, e.updateIndex()); - else - return false; - } - else - rcvCntrs.put(part, e.updateIndex()); - - return true; - } - - /** * */ private static class PartitionRecovery { @@ -617,15 +642,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param e Cache continuous qeury entry. */ public void add(CacheContinuousQueryEntry e) { - synchronized (pendingEnts) { - if (pendingEnts.containsKey(e.updateIndex()) || e.updateIndex() <= lastFiredEvt) - e.cacheId(); - //log.info("Skip duplicate continuous query entry. Entry: " + e); - else { - //log.info("Added continuous query entry. Entry: " + e); + assert e != null; + synchronized (pendingEnts) { + if (!pendingEnts.containsKey(e.updateIndex()) && e.updateIndex() > lastFiredEvt) pendingEnts.put(e.updateIndex(), e); - } + else if (log.isDebugEnabled()) + log.debug("Skip duplicate continuous query message: " + e); } } @@ -641,45 +664,53 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator(); - Map.Entry<Long, CacheContinuousQueryEntry> prev = null; - - Set<Long> rmvEnts = new HashSet<>(); + boolean fired = false; + // The elements are consistently. while (iter.hasNext()) { Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - // The elements are consistently. if (e.getKey() == lastFiredEvt + 1) { ++lastFiredEvt; entries.add(e.getValue()); iter.remove(); + + fired = true; + } + } + + if (!fired && lastFiredEvt == 0 && pendingEnts.size() >= QUERY_HOLE_THRESHOLD) { + Long prevCnt = null; + + int orderedCnt = 0; + + for (Long cnt : pendingEnts.keySet()) { + if (prevCnt != null) { + if (prevCnt + 1 != cnt) + break; + else + ++orderedCnt; + } + + prevCnt = cnt; } - // Handle hole in sequence. - else if (prev != null && prev.getKey() + 1 == e.getKey()) { - entries.add(prev.getValue()); - lastFiredEvt = prev.getKey(); + if (orderedCnt >= QUERY_HOLE_THRESHOLD) { + iter = pendingEnts.entrySet().iterator(); - rmvEnts.add(prev.getKey()); + while (entries.size() < orderedCnt) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - if (!iter.hasNext()) { entries.add(e.getValue()); lastFiredEvt = e.getKey(); - rmvEnts.add(e.getKey()); + iter.remove(); } } - else if (prev != null) - break; - - prev = e; } - - for (Long rmKey : rmvEnts) - pendingEnts.remove(rmKey); } return entries; @@ -737,12 +768,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** {@inheritDoc} */ @Override public void partitionLost(String cacheName, int partId) { - if (this.cacheName == null) { - int z = 0; - - ++z; - } - if ((this.cacheName == null && cacheName == null) // Check default caches. || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) { PartitionRecovery rcv = rcvs.get(partId); @@ -962,19 +987,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** - * - */ - private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public boolean apply(CacheContinuousQueryEntry e) { - return notifyListener(e); - } - } - - /** * Deployable object. */ private static class DeployableObject implements Externalizable { http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 735e808..a706105 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -60,7 +60,14 @@ interface CacheContinuousQueryListener<K, V> { * @param cacheName Cache name. * @param partId Partition ID. */ - public void partitionLost(String cacheName, int partId); + public void firePartitionLostEvent(String cacheName, int partId); + + /** + * Handle partition lost event. + * + * @param partId Partition ID. + */ + public void partitionLost(int partId); /** * Flushes backup queue. http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java index 734d072..eeb20cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java @@ -18,27 +18,22 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.nio.ByteBuffer; -import javax.cache.event.EventType; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.Nullable; /** * Continuous query entry. */ -public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry { +public class CacheContinuousQueryLostPartition extends GridCacheMessage { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private int cacheId; + /** Routine ID. */ + private UUID routineId; /** Partition. */ private int part; @@ -54,34 +49,38 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry * @param cacheId Cache ID. * @param part Partition ID. */ - CacheContinuousQueryLostPartition(int cacheId, int part) { + CacheContinuousQueryLostPartition(UUID routineId, int cacheId, int part) { + this.routineId = routineId; this.cacheId = cacheId; this.part = part; } /** - * @return Cache ID. + * @return Partition. */ - int cacheId() { - return cacheId; + int partition() { + return part; } /** - * @return Partition. + * @return Routine ID. */ - int partition() { - return part; + UUID routineId() { + return routineId; } /** {@inheritDoc} */ @Override public byte directType() { - return 116; + return 115; } /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); + if (!super.writeTo(buf, writer)) + return false; + if (!writer.isHeaderWritten()) { if (!writer.writeHeader(directType(), fieldsCount())) return false; @@ -90,17 +89,18 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry } switch (writer.state()) { - case 0: - if (!writer.writeInt("cacheId", cacheId)) + case 3: + if (!writer.writeInt("part", part)) return false; writer.incrementState(); - case 1: - if (!writer.writeInt("part", part)) + case 4: + if (!writer.writeUuid("routineId", routineId)) return false; writer.incrementState(); + } return true; @@ -113,44 +113,36 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry if (!reader.beforeMessageRead()) return false; + if (!super.readFrom(buf, reader)) + return false; + switch (reader.state()) { - case 0: - cacheId = reader.readInt("cacheId"); + case 3: + part = reader.readInt("part"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 1: - part = reader.readInt("part"); + case 4: + routineId = reader.readUuid("routineId"); if (!reader.isLastRead()) return false; - reader.incrementState(); } - return reader.afterMessageRead(CacheContinuousQueryLostPartition.class); - } - - /** {@inheritDoc} */ - @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { - // No-op. + return true; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 2; + return 5; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheContinuousQueryLostPartition.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index dedcd0a..bc68b58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -46,7 +46,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -128,6 +127,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } }); + cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryLostPartition.class, + new CI2<UUID, CacheContinuousQueryLostPartition>() { + @Override public void apply(UUID uuid, CacheContinuousQueryLostPartition msg) { + CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); + + if (lsnr != null) + lsnr.partitionLost(msg.partition()); + } + }); + cctx.time().schedule(new Runnable() { @Override public void run() { for (CacheContinuousQueryListener lsnr : lsnrs.values()) @@ -145,10 +154,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt; for (CacheContinuousQueryListener lsnr : lsnrs.values()) - lsnr.partitionLost(evt0.cacheName(), evt0.partition()); + lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition()); for (CacheContinuousQueryListener lsnr : intLsnrs.values()) - lsnr.partitionLost(evt0.cacheName(), evt0.partition()); + lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition()); } }, EVT_CACHE_REBALANCE_PART_DATA_LOST); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e2133fa5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 3bba5e6..61fa6cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -264,6 +264,84 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ + public void testStartStopQuery() throws Exception { + this.backups = 1; + + final int SRV_NODES = 3; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + final Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache<Object, Object> clnCache = qryClient.cache(null); + + Ignite igniteSrv = ignite(0); + + IgniteCache<Object, Object> srvCache = igniteSrv.cache(null); + + List<Integer> keys = testKeys(srvCache, 1); + + int keyCnt = keys.size(); + + for (int j = 0; j < 50; ++j) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final TestLocalListener lsnr = new TestLocalListener(); + + qry.setLocalListener(lsnr); + + int keyIter = 0; + + for (; keyIter < keyCnt / 2; keyIter++) { + int key = keys.get(keyIter); + + clnCache.put(key, key); + } + + assert lsnr.evts.isEmpty(); + + QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry); + + Map<Object, T2<Object, Object>> updates = new HashMap<>(); + + final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); + + Affinity<Object> aff = affinity(srvCache); + + for (; keyIter < keys.size(); keyIter++) { + int key = keys.get(keyIter); + + log.info("Put [key=" + key + ", part=" + aff.partition(key) + ']'); + + T2<Object, Object> t = updates.get(key); + + if (t == null) { + updates.put(key, new T2<>((Object)key, null)); + + expEvts.add(new T3<>((Object)key, (Object)key, null)); + } + else { + updates.put(key, new T2<>((Object)key, (Object)key)); + + expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); + } + + srvCache.put(key, key); + } + + checkEvents(expEvts, lsnr); + + query.close(); + } + } + + /** + * @throws Exception If failed. + */ public void testLeftPrimaryAndBackupNodes() throws Exception { this.backups = 1; @@ -745,8 +823,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @param expEvts Expected events. * @param lsnr Listener. */ - private void checkEvents(List<T3<Object, Object, Object>> expEvts, TestLocalListener lsnr) { - assert lsnr.evts.size() == expEvts.size(); + private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final TestLocalListener lsnr) + throws Exception { + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr.evts.size() == expEvts.size(); + } + }, 2000L); for (T3<Object, Object, Object> exp : expEvts) { CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
