http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 4f783db..750cded 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -21,13 +21,19 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; @@ -53,6 +59,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -119,6 +126,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private transient Map<Integer, Long> rcvCntrs; /** */ + private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; + + /** */ private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter; /** */ @@ -183,6 +193,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { else { rcvCntrs = new ConcurrentHashMap<>(); + rcvs = new ConcurrentHashMap<>(); + dupEvtFilter = new DuplicateEventFilter(); } @@ -258,6 +270,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { GridCacheContext<K, V> cctx = cacheContext(ctx); + // Check that cache stopped. + if (cctx == null) + return; + // skipPrimaryCheck is set only when listen locally for replicated cache events. assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); @@ -272,27 +288,78 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - if (notify) { - try { - final CacheContinuousQueryEntry entry = evt.entry(); + try { + final CacheContinuousQueryEntry entry = notify ? evt.entry() : + new CacheContinuousQueryFilteredEntry(evt.entry()); - if (primary || skipPrimaryCheck) { - if (loc) { - if (dupEvtFilter.apply(entry)) { - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + if (primary || skipPrimaryCheck) { + if (loc) { + if (dupEvtFilter.apply(entry)) { + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - if (!skipPrimaryCheck) - sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); - } + if (!skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); } - else { - prepareEntry(cctx, nodeId, entry); + } + else { + prepareEntry(cctx, nodeId, entry); - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); - } + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } - else - backupQueue.add(entry); + } + else + backupQueue.add(entry); + } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + + if (recordIgniteEvt && notify) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + + @Override public void partitionLost(String cacheName0, int partId) { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + // Check that cache stopped. + if (cctx == null) + return; + + if ((cacheName == null && cacheName0 == null) || // Check default cache. + (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) { + + final CacheContinuousQueryEntry entry = + new CacheContinuousQueryLostPartition(cctx.cacheId(), partId); + + try { + prepareEntry(cctx, nodeId, entry); + + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } catch (ClusterTopologyCheckedException ex) { IgniteLogger log = ctx.log(getClass()); @@ -304,27 +371,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { catch (IgniteCheckedException ex) { U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); } - - if (recordIgniteEvt) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - rmtFilter, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); - } } } @@ -476,13 +522,47 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, + Map<Integer, PartitionRecovery> parts = new HashMap<>(); + + for (CacheContinuousQueryEntry e : entries) { + PartitionRecovery rec = parts.containsKey(e.partition()) ? + parts.get(e.partition()) : rcvs.get(e.partition()); + + if (rec == null) { + rec = new PartitionRecovery(ctx.log(getClass())); + + PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); + + if (oldRec != null) + rec = oldRec; + } + + if (e instanceof CacheContinuousQueryLostPartition) + rec.reset(); + else { + rec.add(e); + + if (!parts.containsKey(e.partition())) + parts.put(e.partition(), rec); + } + } + + Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); + + for (PartitionRecovery rec : parts.values()) + entries0.addAll(rec.entries()); + + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { return new CacheContinuousQueryEvent<>(cache, cctx, e); } }, - dupEvtFilter + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.filtered(); + } + } ); locLsnr.onUpdated(evts); @@ -500,12 +580,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (cntr != null) { long cntr0 = cntr; - if (e.updateIndex() > cntr0) { - // TODO IGNITE-426: remove assert. - assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']'; - + if (e.updateIndex() > cntr0) rcvCntrs.put(part, e.updateIndex()); - } else return false; } @@ -515,6 +591,119 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return true; } + /** + * + */ + private static class PartitionRecovery { + /** */ + private IgniteLogger log; + + /** */ + private long lastFiredEvt = 0; + + /** */ + private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>(); + + /** + * @param log Logger. + */ + public PartitionRecovery(IgniteLogger log) { + this.log = log; + } + + /** + * Add continuous entry. + * + * @param e Cache continuous qeury entry. + */ + public void add(CacheContinuousQueryEntry e) { + synchronized (pendingEnts) { + if (pendingEnts.containsKey(e.updateIndex()) || e.updateIndex() <= lastFiredEvt) + e.cacheId(); + //log.info("Skip duplicate continuous query entry. Entry: " + e); + else { + //log.info("Added continuous query entry. Entry: " + e); + + pendingEnts.put(e.updateIndex(), e); + } + } + } + + /** + * @return Ordered continuous query entries. + */ + public Collection<CacheContinuousQueryEntry> entries() { + List<CacheContinuousQueryEntry> entries = new ArrayList<>(); + + synchronized (pendingEnts) { + if (pendingEnts.isEmpty()) + return Collections.emptyList(); + + Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator(); + + Map.Entry<Long, CacheContinuousQueryEntry> prev = null; + + Set<Long> rmvEnts = new HashSet<>(); + + while (iter.hasNext()) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + // The elements are consistently. + if (e.getKey() == lastFiredEvt + 1) { + ++lastFiredEvt; + + entries.add(e.getValue()); + + iter.remove(); + } + // Handle hole in sequence. + else if (prev != null && prev.getKey() + 1 == e.getKey()) { + entries.add(prev.getValue()); + + lastFiredEvt = prev.getKey(); + + rmvEnts.add(prev.getKey()); + + if (!iter.hasNext()) { + entries.add(e.getValue()); + + lastFiredEvt = e.getKey(); + + rmvEnts.add(e.getKey()); + } + } + else if (prev != null) + break; + + prev = e; + } + + for (Long rmKey : rmvEnts) + pendingEnts.remove(rmKey); + } + + return entries; + } + + /** + * Reset internal state. + */ + public void reset() { + synchronized (pendingEnts) { + Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator(); + + while (iter.hasNext()) { + Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); + + if (e.getKey() >= lastFiredEvt) + iter.remove(); + } + + lastFiredEvt = 0; + } + } + } + /** {@inheritDoc} */ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { assert ctx != null; @@ -546,6 +735,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); } + /** {@inheritDoc} */ + @Override public void partitionLost(String cacheName, int partId) { + if (this.cacheName == null) { + int z = 0; + + ++z; + } + + if ((this.cacheName == null && cacheName == null) // Check default caches. + || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) { + PartitionRecovery rcv = rcvs.get(partId); + + if (rcv != null) + rcv.reset(); + } + } + /** * @param t Acknowledge information. * @param routineId Routine ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 2f9e111..735e808 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -55,6 +55,14 @@ interface CacheContinuousQueryListener<K, V> { public void cleanupBackupQueue(Map<Integer, Long> updateIdxs); /** + * Fire event that partition lost. + * + * @param cacheName Cache name. + * @param partId Partition ID. + */ + public void partitionLost(String cacheName, int partId); + + /** * Flushes backup queue. * * @param ctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java new file mode 100644 index 0000000..734d072 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.nio.ByteBuffer; +import javax.cache.event.EventType; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * Continuous query entry. + */ +public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private int cacheId; + + /** Partition. */ + private int part; + + /** + * Required by {@link Message}. + */ + public CacheContinuousQueryLostPartition() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param part Partition ID. + */ + CacheContinuousQueryLostPartition(int cacheId, int part) { + this.cacheId = cacheId; + this.part = part; + } + + /** + * @return Cache ID. + */ + int cacheId() { + return cacheId; + } + + /** + * @return Partition. + */ + int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 116; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("cacheId", cacheId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cacheId = reader.readInt("cacheId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(CacheContinuousQueryLostPartition.class); + } + + /** {@inheritDoc} */ + @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryLostPartition.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index f0e9c0b..dedcd0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -46,6 +46,10 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.events.CacheRebalancingEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -65,6 +69,7 @@ import static javax.cache.event.EventType.EXPIRED; import static javax.cache.event.EventType.REMOVED; import static javax.cache.event.EventType.UPDATED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; /** @@ -132,6 +137,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext()); } }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); + + cctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt instanceof CacheRebalancingEvent; + + CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt; + + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + lsnr.partitionLost(evt0.cacheName(), evt0.partition()); + + for (CacheContinuousQueryListener lsnr : intLsnrs.values()) + lsnr.partitionLost(evt0.cacheName(), evt0.partition()); + } + }, EVT_CACHE_REBALANCE_PART_DATA_LOST); } /** {@inheritDoc} */ @@ -664,7 +683,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create(); if (!(fltr instanceof Serializable)) - throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + fltr); + throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + + fltr); } CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types); http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java index 2fef161..67b8c82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java @@ -1,7 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.processors.continuous; +import java.util.Collection; + /** - * Created by Nikolay on 02.09.2015. + * Continuous routine batch. */ public interface GridContinuousBatch { + /** + * Adds element to this batch. + * + * @param obj Element to add. + */ + public void add(Object obj); + + /** + * Collects elements that are currently in this batch. + * + * @return Elements in this batch. + */ + public Collection<Object> collect(); + + /** + * @return Current batch size. + */ + public int size(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java index 8e29e29..4540de1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -1,7 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.processors.continuous; +import java.util.Collection; +import org.jsr166.ConcurrentLinkedDeque8; + /** - * Created by Nikolay on 02.09.2015. + * Continuous routine batch adapter. */ -public class GridContinuousBatchAdapter { +public class GridContinuousBatchAdapter implements GridContinuousBatch { + /** Buffer. */ + private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); + + /** {@inheritDoc} */ + @Override public void add(Object obj) { + assert obj != null; + + buf.add(obj); + } + + /** {@inheritDoc} */ + @Override public Collection<Object> collect() { + return buf; + } + + /** {@inheritDoc} */ + @Override public int size() { + return buf.sizex(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 30e596a..975cd2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -98,6 +98,28 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException; /** + * Creates new batch. + * + * @return New batch. + */ + public GridContinuousBatch createBatch(); + + /** + * Called when ack for a batch is received from client. + * + * @param routineId Routine ID. + * @param batch Acknowledged batch. + * @param ctx Kernal context. + */ + public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx); + + /** + * @param cacheName Cache name. + * @param partId Partition ID. + */ + public void partitionLost(String cacheName, int partId); + + /** * @return Topic for ordered notifications. If {@code null}, notifications * will be sent in non-ordered messages. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 15c9dd2..c7676d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -35,7 +35,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; @@ -57,12 +59,14 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; @@ -72,6 +76,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index abb2767..9ee6fe7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -529,7 +529,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean conflictResolve, boolean intercept, UUID subjId, - String taskName) throws IgniteCheckedException, + String taskName, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index ed856a5..3bba5e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -25,23 +25,28 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; @@ -59,12 +64,17 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -254,6 +264,302 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ + public void testLeftPrimaryAndBackupNodes() throws Exception { + this.backups = 1; + + final int SRV_NODES = 3; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + final Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final TestLocalListener lsnr = new TestLocalListener(); + + qry.setLocalListener(lsnr); + + qry.setRemoteFilter(lsnr); + + IgniteCache<Object, Object> clnCache = qryClient.cache(null); + + QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry); + + Ignite igniteSrv = ignite(0); + + IgniteCache<Object, Object> srvCache = igniteSrv.cache(null); + + Affinity<Object> aff = affinity(srvCache); + + List<Integer> keys = testKeys(srvCache, 1); + + Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(keys.get(0)); + + Collection<UUID> ids = F.transform(nodes, new C1<ClusterNode, UUID>() { + @Override public UUID apply(ClusterNode node) { + return node.id(); + } + }); + + int keyIter = 0; + + boolean filtered = false; + + Map<Object, T2<Object, Object>> updates = new HashMap<>(); + + final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); + + for (; keyIter < keys.size() / 2; keyIter++) { + int key = keys.get(keyIter); + + log.info("Put [key=" + key + ", part=" + aff.partition(key) + + ", filtered=" + filtered + ']'); + + T2<Object, Object> t = updates.get(key); + + Integer val = filtered ? + (key % 2 == 0 ? key + 1 : key) : + key * 2; + + if (t == null) { + updates.put(key, new T2<>((Object)val, null)); + + if (!filtered) + expEvts.add(new T3<>((Object)key, (Object)val, null)); + } + else { + updates.put(key, new T2<>((Object)val, (Object)key)); + + if (!filtered) + expEvts.add(new T3<>((Object)key, (Object)val, (Object)key)); + } + + srvCache.put(key, val); + + filtered = !filtered; + } + + checkEvents(expEvts, lsnr); + + List<Thread> stopThreads = new ArrayList<>(3); + + // Stop nodes which owning this partition. + for (int i = 0; i < SRV_NODES; i++) { + Ignite ignite = ignite(i); + + if (ids.contains(ignite.cluster().localNode().id())) { + final int i0 = i; + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.skipAllMsg = true; + + stopThreads.add(new Thread() { + @Override public void run() { + stopGrid(i0, true); + } + }); + } + } + + // Stop and join threads. + for (Thread t : stopThreads) + t.start(); + + for (Thread t : stopThreads) + t.join(); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + // (SRV_NODES + 1 client node) - 1 primary - backup nodes. + return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */) + - 1 /** Primary node */ - backups; + } + }, 10000L); + + for (; keyIter < keys.size(); keyIter++) { + int key = keys.get(keyIter); + + log.info("Put [key=" + key + ", filtered=" + filtered + ']'); + + T2<Object, Object> t = updates.get(key); + + Integer val = filtered ? + (key % 2 == 0 ? key + 1 : key) : + key * 2; + + if (t == null) { + updates.put(key, new T2<>((Object)val, null)); + + if (!filtered) + expEvts.add(new T3<>((Object)key, (Object)val, null)); + } + else { + updates.put(key, new T2<>((Object)val, (Object)key)); + + if (!filtered) + expEvts.add(new T3<>((Object)key, (Object)val, (Object)key)); + } + + clnCache.put(key, val); + + filtered = !filtered; + } + + checkEvents(expEvts, lsnr); + + query.close(); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteFilter() throws Exception { + this.backups = 2; + + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); + + if (cacheMode() != REPLICATED) + assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups()); + + Affinity<Object> aff = qryClient.affinity(null); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final TestLocalListener lsnr = new TestLocalListener(); + + qry.setLocalListener(lsnr); + + qry.setRemoteFilter(lsnr); + + int PARTS = 10; + + QueryCursor<?> cur = qryClientCache.query(qry); + + Map<Object, T2<Object, Object>> updates = new HashMap<>(); + + final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); + + for (int i = 0; i < SRV_NODES - 1; i++) { + log.info("Stop iteration: " + i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); + + Ignite ignite = ignite(i); + + IgniteCache<Object, Object> cache = ignite.cache(null); + + List<Integer> keys = testKeys(cache, PARTS); + + boolean first = true; + + boolean filtered = false; + + for (Integer key : keys) { + log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + + ", filtered=" + filtered + ']'); + + T2<Object, Object> t = updates.get(key); + + Integer val = filtered ? + (key % 2 == 0 ? key + 1 : key) : + key * 2; + + if (t == null) { + updates.put(key, new T2<>((Object)val, null)); + + if (!filtered) + expEvts.add(new T3<>((Object)key, (Object)val, null)); + } + else { + updates.put(key, new T2<>((Object)val, (Object)key)); + + if (!filtered) + expEvts.add(new T3<>((Object)key, (Object)val, (Object)key)); + } + + cache.put(key, val); + + if (first) { + spi.skipMsg = true; + + first = false; + } + + filtered = !filtered; + } + + stopGrid(i); + + boolean check = GridTestUtils.waitForCondition(new PAX() { + @Override public boolean applyx() throws IgniteCheckedException { + return expEvts.size() == lsnr.keys.size(); + } + }, 5000L); + + if (!check) { + Set<Integer> keys0 = new HashSet<>(keys); + + keys0.removeAll(lsnr.keys); + + log.info("Missed events for keys: " + keys0); + + fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']'); + } + + checkEvents(expEvts, lsnr); + } + + cur.close(); + } + + /** + * + */ + public static class TestLocalListener implements CacheEntryUpdatedListener<Object, Object>, + CacheEntryEventSerializableFilter<Object, Object> { + /** Keys. */ + GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>(); + + /** Events. */ + private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException { + for (CacheEntryEvent<?, ?> e : events) { + System.err.println("Update entry: " + e); + + Integer key = (Integer)e.getKey(); + + keys.add(key); + + evts.put(key, e); + } + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException { + return (Integer)e.getValue() % 2 == 0; + } + } + + /** + * @throws Exception If failed. + */ public void testThreeBackups() throws Exception { if (cacheMode() == REPLICATED) return; @@ -261,6 +567,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo checkBackupQueue(3, false); } + /** {@inheritDoc} */ + @Override public boolean isDebug() { + return true; + } + /** * @param backups Number of backups. * @param updateFromClient If {@code true} executes cache update from client node. @@ -423,7 +734,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo assertNotNull("No event for key: " + exp.get1(), e); assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); - assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue()); } expEvts.clear(); @@ -432,6 +742,26 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo } /** + * @param expEvts Expected events. + * @param lsnr Listener. + */ + private void checkEvents(List<T3<Object, Object, Object>> expEvts, TestLocalListener lsnr) { + assert lsnr.evts.size() == expEvts.size(); + + for (T3<Object, Object, Object> exp : expEvts) { + CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1()); + + assertNotNull("No event for key: " + exp.get1(), e); + assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); + } + + expEvts.clear(); + + lsnr.evts.clear(); + lsnr.keys.clear(); + } + + /** * @param cache Cache. * @param parts Number of partitions. * @return Keys. @@ -447,7 +777,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo int[] nodeParts = aff.primaryPartitions(node); - final int KEYS_PER_PART = 3; + final int KEYS_PER_PART = 50; for (int i = 0; i < parts; i++) { int part = nodeParts[i]; @@ -919,7 +1249,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo assertEquals(key, rcvdEvt.getKey()); assertEquals(expEvt.get1(), rcvdEvt.getValue()); - assertEquals(expEvt.get2(), rcvdEvt.getOldValue()); } } } @@ -1012,7 +1341,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>(); /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) + @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { try { for (CacheEntryEvent<?, ?> evt : evts) { @@ -1026,18 +1355,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo boolean dup = false; - if (prevVal != null) { - if (prevVal.equals(val)) // Can get this event with automatic put retry. - dup = true; - else { - assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val); - assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue()); - } - } - else { - assertEquals("Unexpected event: " + evt, (Object)0, val); - assertNull("Unexpected event: " + evt, evt.getOldValue()); - } + if (prevVal != null && prevVal.equals(val)) + dup = true; if (!dup) { vals.put(key, val); @@ -1074,6 +1393,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo private volatile boolean skipMsg; /** */ + private volatile boolean skipAllMsg; + + /** */ private volatile AtomicBoolean sndFirstOnly; /** {@inheritDoc} */ @@ -1081,6 +1403,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo throws IgniteSpiException { Object msg0 = ((GridIoMessage)msg).message(); + if (skipAllMsg) + return; + if (msg0 instanceof GridContinuousMessage) { if (skipMsg) { log.info("Skip continuous message: " + msg0); http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index d133a84..503b992 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1232,7 +1232,7 @@ public abstract class GridAbstractTest extends TestCase { if (isDebug()) { discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE); - cfg.setNetworkTimeout(Long.MAX_VALUE); + cfg.setNetworkTimeout(Long.MAX_VALUE / 3); } else { // Set network timeout to 10 sec to avoid unexpected p2p class loading errors.
