cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7bf63c0e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7bf63c0e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7bf63c0e Branch: refs/heads/ignite-5075-cc-debug Commit: 7bf63c0ee7d41317dc558d40267e40d9ed65fda3 Parents: 79e34c2 Author: sboikov <[email protected]> Authored: Thu May 25 11:30:17 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 11:30:17 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryAcknowledgeBuffer.java | 120 ++++ .../CacheContinuousQueryDeployableObject.java | 110 ++++ .../CacheContinuousQueryEventBuffer.java | 91 ++- .../continuous/CacheContinuousQueryHandler.java | 602 +++---------------- .../CacheContinuousQueryHandlerV2.java | 6 +- .../CacheContinuousQueryPartitionRecovery.java | 252 ++++++++ .../continuous/GridContinuousProcessor.java | 7 +- .../CacheContinuousQueryEventBufferTest.java | 65 +- 8 files changed, 682 insertions(+), 571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java new file mode 100644 index 0000000..c95dc42 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** */ +class CacheContinuousQueryAcknowledgeBuffer { + /** */ + private int size; + + /** */ + @GridToStringInclude + private Map<Integer, Long> updateCntrs = new HashMap<>(); + + /** */ + @GridToStringInclude + private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); + + /** + * @param batch Batch. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @SuppressWarnings("unchecked") + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>onAcknowledged( + GridContinuousBatch batch) { + assert batch instanceof GridContinuousQueryBatch; + + size += ((GridContinuousQueryBatch)batch).entriesCount(); + + Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); + + for (CacheContinuousQueryEntry e : entries) + addEntry(e); + + return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(CacheContinuousQueryEntry e) { + size++; + + addEntry(e); + + return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + */ + private void addEntry(CacheContinuousQueryEntry e) { + topVers.add(e.topologyVersion()); + + Long cntr0 = updateCntrs.get(e.partition()); + + if (cntr0 == null || e.updateCounter() > cntr0) + updateCntrs.put(e.partition(), e.updateCounter()); + } + + /** + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + acknowledgeOnTimeout() { + return size > 0 ? acknowledgeData() : null; + } + + /** + * @return Tuple with acknowledge information. + */ + private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { + assert size > 0; + + Map<Integer, Long> cntrs = new HashMap<>(updateCntrs); + + IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = + new IgniteBiTuple<>(cntrs, topVers); + + topVers = U.newHashSet(1); + + size = 0; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryAcknowledgeBuffer.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java new file mode 100644 index 0000000..f888467 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Deployable object. + */ +class CacheContinuousQueryDeployableObject implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized object. */ + private byte[] bytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryDeployableObject() { + // No-op. + } + + /** + * @param obj Object. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + protected CacheContinuousQueryDeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { + assert obj != null; + assert ctx != null; + + Class cls = U.detectClass(obj); + + clsName = cls.getName(); + + GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); + + depInfo = new GridDeploymentInfoBean(dep); + + bytes = U.marshal(ctx, obj); + } + + /** + * @param nodeId Node ID. + * @param ctx Kernal context. + * @return Deserialized object. + * @throws IgniteCheckedException In case of error. + */ + <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, + depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, bytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + bytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index a308e39..949ea67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; @@ -36,10 +39,52 @@ public class CacheContinuousQueryEventBuffer { IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 5); /** */ + protected final int part; + + /** + * @param part Partition number. + */ + CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + + /** */ private AtomicReference<Batch> curBatch = new AtomicReference<>(); /** */ - private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>(); + private ConcurrentLinkedDeque<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque<>(); + + /** */ + private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); + + /** + * @param updateCntr Acknowledged counter. + */ + void cleanupBackupQueue(Long updateCntr) { + Iterator<CacheContinuousQueryEntry> it = backupQ.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + if (backupEntry.updateCounter() <= updateCntr) + it.remove(); + } + } + + /** + * @return Backup entries. + */ + @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() { + if (!backupQ.isEmpty()) { + ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ; + + backupQ = new ConcurrentLinkedDeque<>(); + + return ret; + } + + return null; + } /** * @return Initial partition counter. @@ -61,26 +106,20 @@ public class CacheContinuousQueryEventBuffer { /** * @param e Entry to process. + * @param backup Backup entry flag. * @return Collected entries to pass to listener (single entry or entries list). */ - @Nullable Object processEntry(CacheContinuousQueryEntry e) { - return process0(e.updateCounter(), e); - } - - /** - * @param cntr Filtered counter. - * @return Collected entries to pass to listener (single entry or entries list). - */ - @Nullable Object processFiltered(long cntr) { - return process0(cntr, cntr); + @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) { + return process0(e.updateCounter(), e, backup); } /** + * @param backup Backup entry flag. * @param cntr Entry counter. * @param entry Entry. * @return Collected entries. */ - private Object process0(long cntr, Object entry) { + private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) { assert cntr >= 0 : cntr; Batch batch = initBatch(); @@ -88,13 +127,16 @@ public class CacheContinuousQueryEventBuffer { if (batch == null || cntr < batch.startCntr) { assert entry != null : cntr; + if (backup) + backupQ.add(entry); + return entry; } Object res = null; if (cntr <= batch.endCntr) - res = batch.processEvent0(null, cntr, entry); + res = batch.processEvent0(null, cntr, entry, backup); else pending.put(cntr, entry); @@ -104,7 +146,7 @@ public class CacheContinuousQueryEventBuffer { do { batch = batch0; - res = processPending(res, batch); + res = processPending(res, batch, backup); batch0 = curBatch.get(); } @@ -139,17 +181,18 @@ public class CacheContinuousQueryEventBuffer { /** * @param res Current result. * @param batch Current batch. + * @param backup Backup entry flag. * @return New result. */ - @Nullable private Object processPending(@Nullable Object res, Batch batch) { + @Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) { if (pending.floorKey(batch.endCntr) != null) { - for (Map.Entry<Long, Object> p : pending.headMap(batch.endCntr, true).entrySet()) { + for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) { long cntr = p.getKey(); assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr; if (pending.remove(p.getKey()) != null) - res = batch.processEvent0(res, p.getKey(), p.getValue()); + res = batch.processEvent0(res, p.getKey(), p.getValue(), backup); } } @@ -195,13 +238,15 @@ public class CacheContinuousQueryEventBuffer { * @param res Current result. * @param cntr Event counter. * @param evt Event. + * @param backup Backup entry flag. * @return New result. */ @SuppressWarnings("unchecked") @Nullable private Object processEvent0( @Nullable Object res, long cntr, - Object evt) { + CacheContinuousQueryEntry evt, + boolean backup) { int pos = (int)(cntr - startCntr); synchronized (this) { @@ -224,9 +269,15 @@ public class CacheContinuousQueryEventBuffer { filtered = 0; - if (res == null) - res = evt0; + if (res == null) { + if (backup) + backupQ.add(evt0); + else + res = evt0; + } else { + assert !backup; + List<CacheContinuousQueryEntry> resList; if (res instanceof CacheContinuousQueryEntry) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index f7547f5..540f871 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -24,12 +24,9 @@ import java.io.ObjectOutput; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -46,13 +43,10 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; @@ -67,12 +61,10 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; @@ -80,7 +72,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; @@ -93,11 +84,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private static final long serialVersionUID = 0L; /** */ - private static final int BACKUP_ACK_THRESHOLD = + static final int BACKUP_ACK_THRESHOLD = IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD", 100); /** */ - private static final int LSNR_MAX_BUF_SIZE = + static final int LSNR_MAX_BUF_SIZE = IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000); /** Cache name. */ @@ -113,7 +104,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private CacheEntryEventSerializableFilter<K, V> rmtFilter; /** Deployable object for filter. */ - private DeployableObject rmtFilterDep; + private CacheContinuousQueryDeployableObject rmtFilterDep; /** Internal flag. */ private boolean internal; @@ -136,9 +127,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; - /** Backup queue. */ - private transient volatile Collection<CacheContinuousQueryEntry> backupQueue; - /** */ private boolean locCache; @@ -146,13 +134,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private boolean keepBinary; /** */ - private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; + private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs; /** */ private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs; /** */ - private transient AcknowledgeBuffer ackBuf; + private transient CacheContinuousQueryAcknowledgeBuffer ackBuf; /** */ private transient int cacheId; @@ -167,6 +155,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient volatile AffinityTopologyVersion initTopVer; /** */ + private transient volatile boolean nodeLeft; + + /** */ private transient boolean ignoreClsNotFound; /** */ @@ -341,9 +332,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler entryBufs = new ConcurrentHashMap<>(); - backupQueue = new ConcurrentLinkedDeque8<>(); - - ackBuf = new AcknowledgeBuffer(); + ackBuf = new CacheContinuousQueryAcknowledgeBuffer(); rcvs = new ConcurrentHashMap<>(); @@ -413,7 +402,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler ctx.asyncCallbackPool().execute(clsr, evt.partitionId()); } else { - final boolean notify = filter(evt, primary); + final boolean notify = filter(evt); if (log.isDebugEnabled()) log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary @@ -433,6 +422,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler }, sync); } } + else + handleBackupEntry(cctx, evt.entry()); } } @@ -442,50 +433,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) { - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) { - Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); - - while (it.hasNext()) { - CacheContinuousQueryEntry backupEntry = it.next(); - - Long updateCntr = updateCntrs.get(backupEntry.partition()); + for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) { + CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey()); - if (updateCntr != null && backupEntry.updateCounter() <= updateCntr) - it.remove(); - } + if (buf != null) + buf.cleanupBackupQueue(e.getValue()); } } @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) { assert topVer != null; - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); - if (backupQueue0 == null) - return; + ClusterNode node = ctx.discovery().node(nodeId); - try { - ClusterNode nodeId0 = ctx.discovery().node(nodeId); + for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { + CacheContinuousQueryEventBuffer buf = bufE.getValue(); - if (nodeId0 != null) { - GridCacheContext<K, V> cctx = cacheContext(ctx); + Collection<CacheContinuousQueryEntry> backupQueue = buf.resetBackupQueue(); - for (CacheContinuousQueryEntry e : backupQueue0) { - if (!e.isFiltered()) - prepareEntry(cctx, nodeId, e); + if (backupQueue != null && node != null) { + for (CacheContinuousQueryEntry e : backupQueue) { + if (!e.isFiltered()) + prepareEntry(cctx, nodeId, e); - e.topologyVersion(topVer); - } + e.topologyVersion(topVer); + } - ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0, topic); + ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); + } } - else - // Node which start CQ leave topology. Not needed to put data to backup queue. - backupQueue = null; - - backupQueue0.clear(); } catch (IgniteCheckedException e) { U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), @@ -509,14 +488,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } @Override public void onPartitionEvicted(int part) { - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) { - for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); it.hasNext(); ) { - if (it.next().partition() == part) - it.remove(); - } - } + entryBufs.remove(part); } @Override public boolean oldValueRequired() { @@ -743,17 +715,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } - PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion()); + CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion()); return rec.collectEntries(e, cctx, cache); } /** - * @param primary Primary. * @param evt Query event. * @return {@code True} if event passed filter otherwise {@code true}. */ - public boolean filter(CacheContinuousQueryEvent evt, boolean primary) { + public boolean filter(CacheContinuousQueryEvent evt) { CacheContinuousQueryEntry entry = evt.entry(); boolean notify = !entry.isFiltered(); @@ -769,15 +740,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!notify) entry.markFiltered(); - if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) { - entry.markBackup(); - - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) - backupQueue0.add(entry.forBackupQueue()); - } - return notify; } @@ -869,7 +831,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (internal) return; - for (PartitionRecovery rec : rcvs.values()) + for (CacheContinuousQueryPartitionRecovery rec : rcvs.values()) rec.resetTopologyCache(); } @@ -879,12 +841,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param topVer Topology version for current operation. * @return Partition recovery. */ - @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, + @NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId, AffinityTopologyVersion topVer) { assert topVer != null && topVer.topologyVersion() > 0 : topVer; - PartitionRecovery rec = rcvs.get(partId); + CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId); if (rec == null) { T2<Long, Long> partCntrs = null; @@ -909,10 +871,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler else if (initUpdCntrs != null) partCntrs = initUpdCntrs.get(partId); - rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, + rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, partCntrs != null ? partCntrs.get2() : null); - PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); + CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); if (oldRec != null) rec = oldRec; @@ -924,6 +886,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * @param cctx Cache context. * @param e Entry. + */ + private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { + if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries. + return; + + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition()); + + buf.processEntry(e.forBackupQueue(), true); + } + + /** + * @param cctx Cache context. + * @param e Entry. * @return Entry. */ private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { @@ -942,12 +917,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (e.updateCounter() == -1L) return e; - CacheContinuousQueryEventBuffer buf = entryBufs.get(e.partition()); + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition()); - if (buf == null) { - final int part = e.partition(); + return buf.processEntry(e, false); + } - buf = new CacheContinuousQueryEventBuffer() { + /** + * @param cctx Cache context. + * @param part Partition. + * @return Event buffer. + */ + private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) { + CacheContinuousQueryEventBuffer buf = entryBufs.get(part); + + if (buf == null) { + buf = new CacheContinuousQueryEventBuffer(part) { @Override protected long currentPartitionCounter() { GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false); @@ -958,239 +942,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } }; - CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(e.partition(), buf); + CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(part, buf); if (oldBuf != null) buf = oldBuf; } - return buf.processEntry(e); - } - - /** - * - */ - private static class PartitionRecovery { - /** Event which means hole in sequence. */ - private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); - - /** */ - private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE; - - /** */ - private IgniteLogger log; - - /** */ - private long lastFiredEvt; - - /** */ - private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; - - /** */ - private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); - - /** - * @param log Logger. - * @param topVer Topology version. - * @param initCntr Update counters. - */ - PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { - this.log = log; - - if (initCntr != null) { - assert topVer.topologyVersion() > 0 : topVer; - - this.lastFiredEvt = initCntr; - - curTop = topVer; - } - } - - /** - * Resets cached topology. - */ - void resetTopologyCache() { - curTop = AffinityTopologyVersion.NONE; - } - - /** - * Add continuous entry. - * - * @param cctx Cache context. - * @param cache Cache. - * @param entry Cache continuous query entry. - * @return Collection entries which will be fired. This collection should contains only non-filtered events. - */ - <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( - CacheContinuousQueryEntry entry, - GridCacheContext cctx, - IgniteCache cache - ) { - assert entry != null; - - if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. - assert entry.updateCounter() == 0L : entry; - - return F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); - } - - List<CacheEntryEvent<? extends K, ? extends V>> entries; - - synchronized (pendingEvts) { - if (log.isDebugEnabled()) { - log.debug("Handling event [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - // Received first event. - if (curTop == AffinityTopologyVersion.NONE) { - lastFiredEvt = entry.updateCounter(); - - curTop = entry.topologyVersion(); - - if (log.isDebugEnabled()) { - log.debug("First event [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + ']'); - } - - return !entry.isFiltered() ? - F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : - Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); - } - - if (curTop.compareTo(entry.topologyVersion()) < 0) { - if (entry.updateCounter() == 1L && !entry.isBackup()) { - entries = new ArrayList<>(pendingEvts.size()); - - for (CacheContinuousQueryEntry evt : pendingEvts.values()) { - if (evt != HOLE && !evt.isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); - } - - pendingEvts.clear(); - - curTop = entry.topologyVersion(); - - lastFiredEvt = entry.updateCounter(); - - if (!entry.isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); - - if (log.isDebugEnabled()) - log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - - return entries; - } - - curTop = entry.topologyVersion(); - } - - // Check duplicate. - if (entry.updateCounter() > lastFiredEvt) - pendingEvts.put(entry.updateCounter(), entry); - else { - if (log.isDebugEnabled()) - log.debug("Skip duplicate continuous query message: " + entry); - - return Collections.emptyList(); - } - - if (pendingEvts.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + ']'); - } - - return Collections.emptyList(); - } - - Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); - - entries = new ArrayList<>(); - - if (pendingEvts.size() >= MAX_BUFF_SIZE) { - if (log.isDebugEnabled()) { - log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() + - ", bufSize=" + MAX_BUFF_SIZE + - ", partId=" + entry.partition() + ']'); - - for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { - Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - - if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); - - lastFiredEvt = e.getKey(); - - iter.remove(); - } - } - else { - while (iter.hasNext()) { - Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - - CacheContinuousQueryEntry pending = e.getValue(); - - long filtered = pending.filteredCount(); - - boolean fire = e.getKey() == lastFiredEvt + 1;; - - if (!fire && filtered > 0) - fire = e.getKey() - filtered <= lastFiredEvt + 1; - - if (fire) { - lastFiredEvt = e.getKey(); - - if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); - - iter.remove(); - } - else - break; - } - } - } - - if (log.isDebugEnabled()) { - log.debug("Will send to listener the following events [entries=" + entries + - ", lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - return entries; - } + return buf; } /** {@inheritDoc} */ @Override public void onNodeLeft() { - Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - - if (backupQueue0 != null) - backupQueue = null; + nodeLeft = true; } /** {@inheritDoc} */ @@ -1199,7 +962,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert ctx.config().isPeerClassLoadingEnabled(); if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) - rmtFilterDep = new DeployableObject(rmtFilter, ctx); + rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx); } /** {@inheritDoc} */ @@ -1320,7 +1083,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean b = in.readBoolean(); if (b) - rmtFilterDep = (DeployableObject)in.readObject(); + rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject(); else rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject(); @@ -1345,95 +1108,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return ctx.cache().<K, V>context().cacheContext(cacheId); } - /** */ - private static class AcknowledgeBuffer { - /** */ - private int size; - - /** */ - @GridToStringInclude - private Map<Integer, Long> updateCntrs = new HashMap<>(); - - /** */ - @GridToStringInclude - private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); - - /** - * @param batch Batch. - * @return Non-null tuple if acknowledge should be sent to backups. - */ - @SuppressWarnings("unchecked") - @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> - onAcknowledged(GridContinuousBatch batch) { - assert batch instanceof GridContinuousQueryBatch; - - size += ((GridContinuousQueryBatch)batch).entriesCount(); - - Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); - - for (CacheContinuousQueryEntry e : entries) - addEntry(e); - - return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; - } - - /** - * @param e Entry. - * @return Non-null tuple if acknowledge should be sent to backups. - */ - @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> - onAcknowledged(CacheContinuousQueryEntry e) { - size++; - - addEntry(e); - - return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; - } - - /** - * @param e Entry. - */ - private void addEntry(CacheContinuousQueryEntry e) { - topVers.add(e.topologyVersion()); - - Long cntr0 = updateCntrs.get(e.partition()); - - if (cntr0 == null || e.updateCounter() > cntr0) - updateCntrs.put(e.partition(), e.updateCounter()); - } - - /** - * @return Non-null tuple if acknowledge should be sent to backups. - */ - @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> - acknowledgeOnTimeout() { - return size > 0 ? acknowledgeData() : null; - } - - /** - * @return Tuple with acknowledge information. - */ - private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { - assert size > 0; - - Map<Integer, Long> cntrs = new HashMap<>(updateCntrs); - - IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = - new IgniteBiTuple<>(cntrs, topVers); - - topVers = U.newHashSet(1); - - size = 0; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(AcknowledgeBuffer.class, this); - } - } - /** * */ @@ -1469,44 +1143,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @Override public void run() { - final boolean notify = filter(evt, primary); - - if (!primary()) - return; + final boolean notify = filter(evt); - if (fut == null) { - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + if (primary || skipPrimaryCheck) { + if (fut == null) { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - return; - } + return; + } - if (fut.isDone()) { - if (fut.error() != null) - evt.entry().markFiltered(); + if (fut.isDone()) { + if (fut.error() != null) + evt.entry().markFiltered(); - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - } - else { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - if (f.error() != null) - evt.entry().markFiltered(); - - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - } - }, evt.entry().partition()); - } - }); + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + else { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + if (f.error() != null) + evt.entry().markFiltered(); + + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + }, evt.entry().partition()); + } + }); + } } - } - - /** - * @return {@code True} if event fired on this node. - */ - private boolean primary() { - return primary || skipPrimaryCheck; + else + handleBackupEntry(cacheContext(ctx), evt.entry()); } /** {@inheritDoc} */ @@ -1515,82 +1183,4 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - /** - * Deployable object. - */ - protected static class DeployableObject implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Serialized object. */ - private byte[] bytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** - * Required by {@link Externalizable}. - */ - public DeployableObject() { - // No-op. - } - - /** - * @param obj Object. - * @param ctx Kernal context. - * @throws IgniteCheckedException In case of error. - */ - protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { - assert obj != null; - assert ctx != null; - - Class cls = U.detectClass(obj); - - clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); - - depInfo = new GridDeploymentInfoBean(dep); - - bytes = U.marshal(ctx, obj); - } - - /** - * @param nodeId Node ID. - * @param ctx Kernal context. - * @return Deserialized object. - * @throws IgniteCheckedException In case of error. - */ - <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { - assert ctx != null; - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); - - return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, bytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - bytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index 7aef4dd..e48d22e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -44,7 +44,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan private Factory<? extends CacheEntryEventFilter> rmtFilterFactory; /** Deployable object for filter factory. */ - private DeployableObject rmtFilterFactoryDep; + private CacheContinuousQueryDeployableObject rmtFilterFactoryDep; /** Event types for JCache API. */ private byte types; @@ -122,7 +122,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan super.p2pMarshal(ctx); if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass())) - rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx); + rmtFilterFactoryDep = new CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx); } /** {@inheritDoc} */ @@ -167,7 +167,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan boolean b = in.readBoolean(); if (b) - rmtFilterFactoryDep = (DeployableObject)in.readObject(); + rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject(); else rmtFilterFactory = (Factory)in.readObject(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java new file mode 100644 index 0000000..534ce9c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import javax.cache.event.CacheEntryEvent; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class CacheContinuousQueryPartitionRecovery { + /** Event which means hole in sequence. */ + private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); + + /** */ + private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE; + + /** */ + private IgniteLogger log; + + /** */ + private long lastFiredEvt; + + /** */ + private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; + + /** */ + private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); + + /** + * @param log Logger. + * @param topVer Topology version. + * @param initCntr Update counters. + */ + CacheContinuousQueryPartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { + this.log = log; + + if (initCntr != null) { + assert topVer.topologyVersion() > 0 : topVer; + + this.lastFiredEvt = initCntr; + + curTop = topVer; + } + } + + /** + * Resets cached topology. + */ + void resetTopologyCache() { + curTop = AffinityTopologyVersion.NONE; + } + + /** + * Add continuous entry. + * + * @param cctx Cache context. + * @param cache Cache. + * @param entry Cache continuous query entry. + * @return Collection entries which will be fired. This collection should contains only non-filtered events. + */ + <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( + CacheContinuousQueryEntry entry, + GridCacheContext cctx, + IgniteCache cache + ) { + assert entry != null; + + if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. + assert entry.updateCounter() == 0L : entry; + + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); + } + + List<CacheEntryEvent<? extends K, ? extends V>> entries; + + synchronized (pendingEvts) { + if (log.isDebugEnabled()) { + log.debug("Handling event [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + // Received first event. + if (curTop == AffinityTopologyVersion.NONE) { + lastFiredEvt = entry.updateCounter(); + + curTop = entry.topologyVersion(); + + if (log.isDebugEnabled()) { + log.debug("First event [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + ']'); + } + + return !entry.isFiltered() ? + F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : + Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); + } + + if (curTop.compareTo(entry.topologyVersion()) < 0) { + if (entry.updateCounter() == 1L && !entry.isBackup()) { + entries = new ArrayList<>(pendingEvts.size()); + + for (CacheContinuousQueryEntry evt : pendingEvts.values()) { + if (evt != HOLE && !evt.isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); + } + + pendingEvts.clear(); + + curTop = entry.topologyVersion(); + + lastFiredEvt = entry.updateCounter(); + + if (!entry.isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); + + if (log.isDebugEnabled()) + log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + + return entries; + } + + curTop = entry.topologyVersion(); + } + + // Check duplicate. + if (entry.updateCounter() > lastFiredEvt) + pendingEvts.put(entry.updateCounter(), entry); + else { + if (log.isDebugEnabled()) + log.debug("Skip duplicate continuous query message: " + entry); + + return Collections.emptyList(); + } + + if (pendingEvts.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + ']'); + } + + return Collections.emptyList(); + } + + Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); + + entries = new ArrayList<>(); + + if (pendingEvts.size() >= MAX_BUFF_SIZE) { + if (log.isDebugEnabled()) { + log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() + + ", bufSize=" + MAX_BUFF_SIZE + + ", partId=" + entry.partition() + ']'); + + for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); + + lastFiredEvt = e.getKey(); + + iter.remove(); + } + } + else { + while (iter.hasNext()) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + CacheContinuousQueryEntry pending = e.getValue(); + + long filtered = pending.filteredCount(); + + boolean fire = e.getKey() == lastFiredEvt + 1;; + + if (!fire && filtered > 0) + fire = e.getKey() - filtered <= lastFiredEvt + 1; + + if (fire) { + lastFiredEvt = e.getKey(); + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); + + iter.remove(); + } + else + break; + } + } + } + + if (log.isDebugEnabled()) { + log.debug("Will send to listener the following events [entries=" + entries + + ", lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + return entries; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index da951f2..a72dcd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -75,7 +75,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; @@ -872,10 +871,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null); } else { - LocalRoutineInfo localRoutineInfo = locInfos.get(routineId); + LocalRoutineInfo locRoutineInfo = locInfos.get(routineId); - if (localRoutineInfo != null) - localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx); + if (locRoutineInfo != null) + locRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java index bc32e00..4710593 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java @@ -66,14 +66,14 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest for (int i = 0; i < 10; i++) { int cnt = rnd.nextInt(10_000) + 1; - testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.5f, threads); - testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.9f, threads); - testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.99f, threads); - testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.01f, threads); - testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.5f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.9f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.99f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.01f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.f, threads); } - CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(); + CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(0); long cntr = 1; @@ -106,32 +106,31 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest { List<CacheContinuousQueryEntry> expEntries = new ArrayList<>(); - List<Object> entries = new ArrayList<>(); + List<CacheContinuousQueryEntry> entries = new ArrayList<>(); long filtered = b.currentFiltered(); for (int i = 0; i < cnt; i++) { - if (rnd.nextFloat() < filterRatio) { - entries.add(cntr); + CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( + 0, + EventType.CREATED, + null, + null, + null, + false, + 0, + cntr, + null); + - cntr++; + entries.add(entry); + + if (rnd.nextFloat() < filterRatio) { + entry.markFiltered(); filtered++; } else { - CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( - 0, - EventType.CREATED, - null, - null, - null, - false, - 0, - cntr, - null); - - entries.add(entry); - CacheContinuousQueryEntry expEntry = new CacheContinuousQueryEntry( 0, EventType.CREATED, @@ -145,12 +144,12 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest expEntry.filteredCount(filtered); - cntr++; - expEntries.add(expEntry); filtered = 0; } + + cntr++; } Collections.shuffle(entries, rnd); @@ -161,12 +160,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest for (int i = 0; i < entries.size(); i++) { Object o = entries.get(i); - Object res; - - if (o instanceof Long) - res = b.processFiltered((Long)o); - else - res = b.processEntry((CacheContinuousQueryEntry)o); + Object res = b.processEntry((CacheContinuousQueryEntry)o, false); if (res != null) { if (res instanceof CacheContinuousQueryEntry) @@ -179,7 +173,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest else { final CyclicBarrier barrier = new CyclicBarrier(threads); - final ConcurrentLinkedQueue<Object> q = new ConcurrentLinkedQueue<>(entries); + final ConcurrentLinkedQueue<CacheContinuousQueryEntry> q = new ConcurrentLinkedQueue<>(entries); final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 = new ConcurrentSkipListMap<>(); @@ -190,12 +184,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest Object o; while ((o = q.poll()) != null) { - Object res; - - if (o instanceof Long) - res = b.processFiltered((Long)o); - else - res = b.processEntry((CacheContinuousQueryEntry)o); + Object res = b.processEntry((CacheContinuousQueryEntry)o, false); if (res != null) { if (res instanceof CacheContinuousQueryEntry)
