Repository: ignite Updated Branches: refs/heads/ignite-5075-cc [created] 3cc4f9f51
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cc4f9f5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cc4f9f5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cc4f9f5 Branch: refs/heads/ignite-5075-cc Commit: 3cc4f9f518b45823c91b266269df72c5486c7b89 Parents: c04b39a Author: sboikov <[email protected]> Authored: Wed May 24 12:10:43 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 24 14:03:11 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 1 - .../continuous/CacheContinuousQueryEntry.java | 31 +-- .../CacheContinuousQueryEventBuffer.java | 272 +++++++++++++++++++ .../continuous/CacheContinuousQueryHandler.java | 196 ++++--------- .../continuous/GridContinuousBatchAdapter.java | 2 +- .../continuous/GridContinuousProcessor.java | 12 +- .../continuous/GridContinuousQueryBatch.java | 16 +- ...nuousQueryConcurrentPartitionUpdateTest.java | 177 ++++++++++++ .../CacheContinuousQueryEventBufferTest.java | 227 ++++++++++++++++ ...eCacheContinuousQueryImmutableEntryTest.java | 2 - .../IgniteCacheQuerySelfTestSuite3.java | 5 + 11 files changed, 763 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index edf90d0..30c2a33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index bf2a691..e40f83e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -105,12 +105,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridToStringInclude private AffinityTopologyVersion topVer; - /** Filtered events. */ - private GridLongList filteredEvts; - /** Keep binary. */ private boolean keepBinary; + /** */ + public long filteredCnt; + /** * Required by {@link Message}. */ @@ -207,13 +207,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** - * @return Size include this event and filtered. - */ - public int size() { - return filteredEvts != null ? filteredEvts.size() + 1 : 1; - } - - /** * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key * (avoid to huge memory consumption), otherwise {@code this}. */ @@ -251,20 +244,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** - * @param cntrs Filtered events. - */ - void filteredEvents(GridLongList cntrs) { - filteredEvts = cntrs; - } - - /** - * @return previous filtered events. - */ - long[] filteredEvents() { - return filteredEvts == null ? null : filteredEvts.array(); - } - - /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ @@ -363,7 +342,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 2: - if (!writer.writeMessage("filteredEvts", filteredEvts)) + if (!writer.writeLong("filteredCnt", filteredCnt)) return false; writer.incrementState(); @@ -446,7 +425,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 2: - filteredEvts = reader.readMessage("filteredEvts"); + filteredCnt = reader.readLong("filteredCnt"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java new file mode 100644 index 0000000..b7b3267 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheContinuousQueryEventBuffer { + /** */ + private static final int BUF_SIZE = 5; + + /** */ + private AtomicReference<Batch> curBatch = new AtomicReference<>(); + + /** */ + private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>(); + + /** + * @return Initial partition counter. + */ + protected long currentPartitionCounter() { + return 0; + } + + /** + * For test purpose only. + * + * @return Current number of filtered events. + */ + long currentFiltered() { + Batch batch = curBatch.get(); + + return batch != null ? batch.filtered : 0; + } + + /** + * @param e Entry to process. + * @return Collected entries to pass to listener (single entry or entries list). + */ + @Nullable Object processEntry(CacheContinuousQueryEntry e) { + return process0(e.updateCounter(), e); + } + + /** + * @param cntr Filtered counter. + * @return Collected entries to pass to listener (single entry or entries list). + */ + @Nullable Object processFiltered(long cntr) { + return process0(cntr, cntr); + } + + /** + * @param cntr Entry counter. + * @param entry Entry. + * @return Collected entries. + */ + private Object process0(long cntr, Object entry) { + assert cntr >= 0 : cntr; + + Batch batch = initBatch(); + + if (batch == null || cntr < batch.startCntr) { + assert entry != null : cntr; + + return entry; + } + + Object res = null; + + if (cntr <= batch.endCntr) + res = batch.processEvent0(null, cntr, entry); + else + pending.put(cntr, entry); + + Batch batch0 = curBatch.get(); + + if (batch0 != batch) { + do { + batch = batch0; + + res = processPending(res, batch); + + batch0 = curBatch.get(); + } + while (batch != batch0); + } + + return res; + } + + /** + * @return Current batch. + */ + @Nullable private Batch initBatch() { + Batch batch = curBatch.get(); + + if (batch != null) + return batch; + + long curCntr = currentPartitionCounter(); + + if (curCntr == -1) + return null; + + batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]); + + if (curBatch.compareAndSet(null, batch)) + return batch; + + return curBatch.get(); + } + + /** + * @param res Current result. + * @param batch Current batch. + * @return New result. + */ + @Nullable private Object processPending(@Nullable Object res, Batch batch) { + if (pending.floorKey(batch.endCntr) != null) { + for (Map.Entry<Long, Object> p : pending.headMap(batch.endCntr, true).entrySet()) { + long cntr = p.getKey(); + + assert cntr >= batch.startCntr : cntr; + + if (cntr <= batch.endCntr && pending.remove(p.getKey()) != null) + res = batch.processEvent0(res, p.getKey(), p.getValue()); + } + } + + return res; + } + + /** + * + */ + private class Batch { + /** */ + private long filtered; + + /** */ + private final long startCntr; + + /** */ + private final long endCntr; + + /** */ + private int lastProc = -1; + + /** */ + private final Object[] evts; + + /** + * @param filtered Number of filtered events before this batch. + * @param evts Events array. + * @param startCntr Start counter. + */ + Batch(long startCntr, long filtered, Object[] evts) { + assert startCntr >= 0; + assert filtered >= 0; + + this.startCntr = startCntr; + this.filtered = filtered; + this.evts = evts; + + endCntr = startCntr + BUF_SIZE - 1; + } + + /** + * @param res Current result. + * @param cntr Event counter. + * @param evt Event. + * @return New result. + */ + @SuppressWarnings("unchecked") + @Nullable private Object processEvent0( + @Nullable Object res, + long cntr, + Object evt) { + int pos = (int)(cntr - startCntr); + + synchronized (this) { + evts[pos] = evt; + + int next = lastProc + 1; + + if (next == pos) { + for (int i = next; i < evts.length; i++) { + Object e = evts[i]; + + if (e != null) { + if (e.getClass() == Long.class) + filtered++; + else { + CacheContinuousQueryEntry evt0 = (CacheContinuousQueryEntry)e; + + if (!evt0.isFiltered()) { + evt0.filteredCnt = filtered; + + filtered = 0; + + if (res == null) + res = evt0; + else { + List<CacheContinuousQueryEntry> resList; + + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); + + resList.add((CacheContinuousQueryEntry)res); + } + else { + assert res instanceof List : res; + + resList = (List<CacheContinuousQueryEntry>)res; + } + + resList.add(evt0); + + res = resList; + } + } + else + filtered++; + } + + pos = i; + } + else + break; + } + + lastProc = pos; + } + else + return res; + } + + if (pos == evts.length -1) { + Arrays.fill(evts, null); + + Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, evts); + + curBatch.set(nextBatch); + } + + return res; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e7706dd..ab70f81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -28,13 +28,11 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; @@ -42,6 +40,7 @@ import javax.cache.event.EventType; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; @@ -59,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; @@ -67,13 +67,12 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; -import org.apache.ignite.internal.util.GridConcurrentSkipListSet; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; @@ -94,7 +93,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private static final long serialVersionUID = 0L; /** */ - private static final int BACKUP_ACK_THRESHOLD = 100; + private static final int BACKUP_ACK_THRESHOLD = + IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD", 100); + + /** */ + private static final int LSNR_MAX_BUF_SIZE = + IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000); /** Cache name. */ private String cacheName; @@ -145,7 +149,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; /** */ - private transient ConcurrentMap<Integer, EntryBuffer> entryBufs; + private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs; /** */ private transient AcknowledgeBuffer ackBuf; @@ -811,13 +815,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!entry.isFiltered()) prepareEntry(cctx, nodeId, entry); - CacheContinuousQueryEntry e = handleEntry(entry); + Object entryOrList = handleEntry(cctx, entry); - if (e != null) { + if (entryOrList != null) { if (log.isDebugEnabled()) - log.debug("Send the following event to listener: " + e); + log.debug("Send the following event to listener: " + entryOrList); - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + ctx.continuous().addNotification(nodeId, routineId, entryOrList, topic, sync, true); } } } @@ -918,10 +922,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * @param cctx Cache context. * @param e Entry. * @return Entry. */ - private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) { + private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) { assert e != null; assert entryBufs != null; @@ -934,21 +939,32 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler // Initial query entry. // This events should be fired immediately. - if (e.updateCounter() == -1) + if (e.updateCounter() == -1L) return e; - EntryBuffer buf = entryBufs.get(e.partition()); + CacheContinuousQueryEventBuffer buf = entryBufs.get(e.partition()); if (buf == null) { - buf = new EntryBuffer(); + final int part = e.partition(); - EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); + buf = new CacheContinuousQueryEventBuffer() { + @Override protected long currentPartitionCounter() { + GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false); - if (oldRec != null) - buf = oldRec; + if (locPart == null) + return -1L; + + return locPart.updateCounter(); + } + }; + + CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(e.partition(), buf); + + if (oldBuf != null) + buf = oldBuf; } - return buf.handle(e); + return buf.processEntry(e); } /** @@ -959,7 +975,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); /** */ - private final static int MAX_BUFF_SIZE = 100; + private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE; /** */ private IgniteLogger log; @@ -1084,11 +1100,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (entry.updateCounter() > lastFiredEvt) { pendingEvts.put(entry.updateCounter(), entry); - // Put filtered events. - if (entry.filteredEvents() != null) { - for (long cnrt : entry.filteredEvents()) { - if (cnrt > lastFiredEvt) - pendingEvts.put(cnrt, HOLE); + // TODO + if (entry.filteredCnt > 0) { + long filteredCntr = entry.updateCounter() - entry.filteredCnt; + + for (long i = 0; i < entry.filteredCnt; i++) { + pendingEvts.put(filteredCntr, HOLE); + + filteredCntr++; } } } @@ -1115,6 +1134,17 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler entries = new ArrayList<>(); if (pendingEvts.size() >= MAX_BUFF_SIZE) { + if (log.isDebugEnabled()) { + log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + + ", curTop=" + curTop + + ", entUpdCnt=" + entry.updateCounter() + + ", partId=" + entry.partition() + + ", pendingEvts=" + pendingEvts + ']'); + } + + LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() + + ", partId=" + entry.partition() + ']'); + for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); @@ -1125,14 +1155,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler iter.remove(); } - - if (log.isDebugEnabled()) { - log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } } else { // Elements are consistently. @@ -1166,116 +1188,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - /** - * - */ - private static class EntryBuffer { - /** */ - private final static int MAX_BUFF_SIZE = 100; - - /** */ - private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>(); - - /** */ - private AtomicLong lastFiredCntr = new AtomicLong(); - - /** - * @param newVal New value. - * @return Old value if previous value less than new value otherwise {@code -1}. - */ - private long updateFiredCounter(long newVal) { - long prevVal = lastFiredCntr.get(); - - while (prevVal < newVal) { - if (lastFiredCntr.compareAndSet(prevVal, newVal)) - return prevVal; - else - prevVal = lastFiredCntr.get(); - } - - return prevVal >= newVal ? -1 : prevVal; - } - - /** - * Add continuous entry. - * - * @param e Cache continuous query entry. - * @return Collection entries which will be fired. - */ - public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) { - assert e != null; - - if (e.isFiltered()) { - Long last = buf.lastx(); - Long first = buf.firstx(); - - if (last != null && first != null && last - first >= MAX_BUFF_SIZE) { - NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true); - - GridLongList filteredEvts = new GridLongList((int)(last - first)); - - int size = 0; - - Long cntr; - - while ((cntr = prevHoles.pollFirst()) != null) { - filteredEvts.add(cntr); - - ++size; - } - - filteredEvts.truncate(size, true); - - e.filteredEvents(filteredEvts); - - return e; - } - - if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) - return e; - else { - buf.add(e.updateCounter()); - - // Double check. If another thread sent a event with counter higher than this event. - if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { - buf.remove(e.updateCounter()); - - return e; - } - else - return null; - } - } - else { - long prevVal = updateFiredCounter(e.updateCounter()); - - if (prevVal == -1) - return e; - else { - NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true); - - GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal)); - - int size = 0; - - Long cntr; - - while ((cntr = prevHoles.pollFirst()) != null) { - filteredEvts.add(cntr); - - ++size; - } - - filteredEvts.truncate(size, true); - - e.filteredEvents(filteredEvts); - - return e; - } - } - } - } - /** {@inheritDoc} */ @Override public void onNodeLeft() { Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java index 4540de1..597eae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java @@ -25,7 +25,7 @@ import org.jsr166.ConcurrentLinkedDeque8; */ public class GridContinuousBatchAdapter implements GridContinuousBatch { /** Buffer. */ - private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); + protected final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>(); /** {@inheritDoc} */ @Override public void add(Object obj) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index abcd8ea..da951f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -897,7 +897,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { throws IgniteCheckedException { assert nodeId != null; assert routineId != null; - assert !msg || obj instanceof Message : obj; + assert !msg || (obj instanceof Message || obj instanceof Collection) : obj; assert !nodeId.equals(ctx.localNodeId()); @@ -917,7 +917,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { syncMsgFuts.put(futId, fut); try { - sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null); + sendNotification(nodeId, + routineId, + futId, + obj instanceof Collection ? (Collection)obj : F.asList(obj), + null, + msg, + null); info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx); } @@ -1563,7 +1569,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridContinuousBatch addAll(Collection<?> objs) { assert objs != null; - GridContinuousBatch toSnd = null; + GridContinuousBatch toSnd; lock.writeLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java index c5d854b..0eba44b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.continuous; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; @@ -31,11 +32,20 @@ public class GridContinuousQueryBatch extends GridContinuousBatchAdapter { /** {@inheritDoc} */ @Override public void add(Object obj) { assert obj != null; - assert obj instanceof CacheContinuousQueryEntry; + assert obj instanceof CacheContinuousQueryEntry || obj instanceof List; - super.add(obj); + if (obj instanceof CacheContinuousQueryEntry) { + buf.add(obj); - size.addAndGet(((CacheContinuousQueryEntry)obj).size()); + size.incrementAndGet(); + } + else { + List<Object> objs = (List<Object>)obj; + + buf.addAll(objs); + + size.addAndGet(objs.size()); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java new file mode 100644 index 0000000..5cdcc98 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatePartitionAtomic() throws Exception { + concurrentUpdatePartition(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdatePartitionTx() throws Exception { + concurrentUpdatePartition(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) throws Exception { + Ignite srv = startGrid(0); + + client = true; + + Ignite client = startGrid(1); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + + IgniteCache clientCache = client.createCache(ccfg); + + final AtomicInteger evtCnt = new AtomicInteger(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent evt : evts) { + assertNotNull(evt.getKey()); + assertNotNull(evt.getValue()); + + evtCnt.incrementAndGet(); + } + } + }); + + clientCache.query(qry); + + Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME); + + final List<Integer> keys = new ArrayList<>(); + + final int KEYS = 10; + + for (int i = 0; i < 100_000; i++) { + if (aff.partition(i) == 0) { + keys.add(i); + + if (keys.size() == KEYS) + break; + } + } + + assertEquals(KEYS, keys.size()); + + final int THREADS = 10; + final int UPDATES = 1000; + + final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 100; i++) { + log.info("Iteration: " + i); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < UPDATES; i++) + srvCache.put(keys.get(rnd.nextInt(KEYS)), i); + + return null; + } + }, THREADS, "update"); + + log.info("Finished update"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + log.info("Events: " + evtCnt.get()); + + return evtCnt.get() >= THREADS * UPDATES; + } + }, 5000); + + assertEquals(THREADS * UPDATES, evtCnt.get()); + + evtCnt.set(0); + } + } + + public void testConcurrentUpdateAndQueryStart() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java new file mode 100644 index 0000000..75a664c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CyclicBarrier; +import javax.cache.event.EventType; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +@SuppressWarnings("unchecked") +public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testBuffer1() throws Exception { + testBuffer(1); + } + + /** + * @throws Exception If failed. + */ + public void testBuffer2() throws Exception { + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + testBuffer(10); + } + } + + /** + * @param threads Threads number. + * @throws Exception If failed. + */ + private void testBuffer(int threads) throws Exception { + long seed = System.nanoTime(); + + Random rnd = new Random(seed); + + log.info("Start test, seed: " + seed); + + for (int i = 0; i < 10; i++) { + int cnt = rnd.nextInt(10_000) + 1; + + testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.5f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.9f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.99f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.01f, threads); + testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.f, threads); + } + + CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(); + + long cntr = 1; + + for (int i = 0; i < 10; i++) { + int cnt = rnd.nextInt(10_000) + 1; + float ratio = rnd.nextFloat(); + + testBuffer(rnd, b, cnt, cntr, ratio, threads); + + cntr += cnt; + } + } + + /** + * @param rnd Random. + * @param b Buffer. + * @param cnt Entries count. + * @param cntr Current counter. + * @param filterRatio Filtered events ratio. + * @param threads Threads number. + * @throws Exception If failed. + */ + private void testBuffer(Random rnd, + final CacheContinuousQueryEventBuffer b, + int cnt, + long cntr, + float filterRatio, + int threads) + throws Exception + { + List<CacheContinuousQueryEntry> expEntries = new ArrayList<>(); + + List<Object> entries = new ArrayList<>(); + + long filtered = b.currentFiltered(); + + for (int i = 0; i < cnt; i++) { + if (rnd.nextFloat() < filterRatio) { + entries.add(cntr); + + cntr++; + + filtered++; + } + else { + CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry( + 0, + EventType.CREATED, + null, + null, + null, + false, + 0, + cntr, + null); + + entries.add(entry); + + CacheContinuousQueryEntry expEntry = new CacheContinuousQueryEntry( + 0, + EventType.CREATED, + null, + null, + null, + false, + 0, + cntr, + null); + + expEntry.filteredCnt = filtered; + + cntr++; + + expEntries.add(expEntry); + + filtered = 0; + } + } + + Collections.shuffle(entries, rnd); + + List<CacheContinuousQueryEntry> actualEntries = new ArrayList<>(expEntries.size()); + + if (threads == 1) { + for (int i = 0; i < entries.size(); i++) { + Object o = entries.get(i); + + Object res; + + if (o instanceof Long) + res = b.processFiltered((Long)o); + else + res = b.processEntry((CacheContinuousQueryEntry)o); + + if (res != null) { + if (res instanceof CacheContinuousQueryEntry) + actualEntries.add((CacheContinuousQueryEntry)res); + else + actualEntries.addAll((List<CacheContinuousQueryEntry>)res); + } + } + } + else { + final CyclicBarrier barrier = new CyclicBarrier(threads); + + final ConcurrentLinkedQueue<Object> q = new ConcurrentLinkedQueue<>(entries); + + final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 = new ConcurrentSkipListMap<>(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + barrier.await(); + + Object o; + + while ((o = q.poll()) != null) { + Object res; + + if (o instanceof Long) + res = b.processFiltered((Long)o); + else + res = b.processEntry((CacheContinuousQueryEntry)o); + + if (res != null) { + if (res instanceof CacheContinuousQueryEntry) + act0.put(((CacheContinuousQueryEntry)res).updateCounter(), (CacheContinuousQueryEntry)res); + else { + for (CacheContinuousQueryEntry e : ((List<CacheContinuousQueryEntry>)res)) + act0.put(e.updateCounter(), e); + } + } + } + + return null; + } + }, threads, "test"); + + actualEntries.addAll(act0.values()); + } + + assertEquals(expEntries.size(), actualEntries.size()); + + for (int i = 0; i < expEntries.size(); i++) { + CacheContinuousQueryEntry expEvt = expEntries.get(i); + CacheContinuousQueryEntry actualEvt = actualEntries.get(i); + + assertEquals(expEvt.updateCounter(), actualEvt.updateCounter()); + assertEquals(expEvt.filteredCnt, actualEvt.filteredCnt); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index b91217f..d230320 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -140,7 +140,6 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst 1L, new AffinityTopologyVersion(1L)); - e0.filteredEvents(new GridLongList(new long[]{1L, 2L})); e0.markFiltered(); ByteBuffer buf = ByteBuffer.allocate(4096); @@ -156,7 +155,6 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst assertEquals(e0.cacheId(), e1.cacheId()); assertEquals(e0.eventType(), e1.eventType()); assertEquals(e0.isFiltered(), e1.isFiltered()); - assertEquals(GridLongList.asList(e0.filteredEvents()), GridLongList.asList(e1.filteredEvents())); assertEquals(e0.isBackup(), e1.isBackup()); assertEquals(e0.isKeepBinary(), e1.isKeepBinary()); assertEquals(e0.partition(), e1.partition()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index 8dd273a..0084cdc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -21,6 +21,8 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBufferTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest; @@ -118,6 +120,9 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class); suite.addTestSuite(ClientReconnectContinuousQueryTest.class); + suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class); + suite.addTestSuite(CacheContinuousQueryEventBufferTest.class); + suite.addTest(IgniteDistributedJoinTestSuite.suite()); return suite;
