Continuous queries fixes: - flush backup queue on exchange end (otherwise we don't really wait for all current operations) - on coordinator apply counters after all single messages received (otherwise extra counter increments are possible) - do not send info about filtered entries if do not have non-filtered entry - added system properties for hardcoded constants
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42293fac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42293fac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42293fac Branch: refs/heads/ignite-5075 Commit: 42293fac88c29544b7c55c0340224afbf474a301 Parents: 827b7f6 Author: sboikov <[email protected]> Authored: Mon May 29 16:41:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 29 16:41:23 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 6 +- .../processors/cache/GridCacheMapEntry.java | 5 +- .../GridCachePartitionExchangeManager.java | 2 +- .../dht/GridClientPartitionTopology.java | 31 +- .../dht/GridDhtPartitionTopology.java | 9 +- .../dht/GridDhtPartitionTopologyImpl.java | 59 +- .../GridDhtPartitionsExchangeFuture.java | 51 +- .../CacheContinuousQueryAcknowledgeBuffer.java | 120 +++ .../CacheContinuousQueryDeployableObject.java | 110 +++ .../continuous/CacheContinuousQueryEntry.java | 117 ++- .../CacheContinuousQueryEventBuffer.java | 483 ++++++++++++ .../continuous/CacheContinuousQueryHandler.java | 733 +++---------------- .../CacheContinuousQueryHandlerV2.java | 6 +- .../continuous/CacheContinuousQueryManager.java | 16 +- .../CacheContinuousQueryPartitionRecovery.java | 267 +++++++ .../continuous/GridContinuousBatchAdapter.java | 2 +- .../continuous/GridContinuousProcessor.java | 19 +- .../continuous/GridContinuousQueryBatch.java | 16 +- ...tinuousQueryAsyncFailoverAtomicSelfTest.java | 1 - ...nuousQueryConcurrentPartitionUpdateTest.java | 304 ++++++++ .../CacheContinuousQueryEventBufferTest.java | 217 ++++++ ...ContinuousQueryFailoverAbstractSelfTest.java | 79 +- ...niteCacheContinuousQueryBackupQueueTest.java | 13 +- ...eCacheContinuousQueryImmutableEntryTest.java | 6 +- .../IgniteCacheQuerySelfTestSuite3.java | 5 + 25 files changed, 1885 insertions(+), 792 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index fdd29e4..bb31645 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -874,7 +874,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { finally { // Reset thread local context. cctx.tm().resetContext(); - cctx.mvcc().contextReset(); + + GridCacheMvccManager mvcc = cctx.mvcc(); + + if (mvcc != null) + mvcc.contextReset(); // Unwind eviction notifications. if (msg instanceof IgniteTxStateAware) { http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4f87658..7c7fc99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -29,7 +28,6 @@ import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; - import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -62,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; -import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -76,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5314088..2eec8f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1312,7 +1312,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) { - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue()) != null; cctx.affinity().checkRebalanceState(top, cacheId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 1de64c5..43bc609 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -650,11 +650,29 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) { + assert cntrMap != null; + + lock.writeLock().lock(); + + try { + for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { + T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr.get2() < e.getValue().get2()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - Map<Integer, T2<Long, Long>> cntrMap + GridDhtPartitionMap parts ) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -733,15 +751,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } - if (cntrMap != null) { - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); - - if (cntr == null || cntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - } - consistencyCheck(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f9fd852..ffc1d63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -234,12 +234,15 @@ public interface GridDhtPartitionTopology { /** * @param exchId Exchange ID. * @param parts Partitions. - * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map<Integer, T2<Long, Long>> cntrMap); + GridDhtPartitionMap parts); + + /** + * @param cntrMap Counters map. + */ + public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap); /** * Checks if there is at least one owner for each partition in the cache topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 8e79eda..7adce6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1256,11 +1256,45 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) { + assert cntrMap != null; + + lock.writeLock().lock(); + + try { + if (stopping) + return; + + for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { + T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr.get2() < e.getValue().get2()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part == null) + continue; + + T2<Long, Long> cntr = cntrMap.get(part.id()); + + if (cntr != null && cntr.get2() > part.updateCounter()) + part.updateCounter(cntr.get2()); + } + } + finally { + lock.writeLock().unlock(); + + } + } + + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map<Integer, T2<Long, Long>> cntrMap + GridDhtPartitionMap parts ) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -1279,27 +1313,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; - if (cntrMap != null) { - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); - - if (cntr == null || cntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); - - if (part == null) - continue; - - T2<Long, Long> cntr = cntrMap.get(part.id()); - - if (cntr != null && cntr.get2() > part.updateCounter()) - part.updateCounter(cntr.get2()); - } - } - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 544f847..72c5bbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; @@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -789,14 +789,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; - //todo check for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId())) continue; if (topChanged) { - cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion()); - // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); } @@ -1101,10 +1098,31 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + /** + * @return {@code True} if exchange triggered by server node join or fail. + */ + private boolean serverNodeDiscoveryEvent() { + assert discoEvt != null; + + return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode()); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { boolean realExchange = !dummy && !forcePreload; + if (err == null && + realExchange && + !cctx.kernalContext().clientNode() && + (serverNodeDiscoveryEvent() || affChangeMsg != null)) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.affinityNode() || cacheCtx.isLocal()) + continue; + + cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion()); + } + } + if (err == null && realExchange) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) @@ -1554,6 +1572,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + for (GridDhtPartitionsAbstractMessage msg : msgs.values()) { + if (msg instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg; + + for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) { + Integer cacheId = entry.getKey(); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : + cctx.exchange().clientTopology(cacheId, this); + + Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId); + + if (cntrs != null) + top.applyUpdateCounters(cntrs); + } + } + } + if (discoEvt.type() == EVT_NODE_JOINED) { if (cctx.kernalContext().state().active()) assignPartitionsStates(); @@ -1785,7 +1822,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : cctx.exchange().clientTopology(cacheId, this); - top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId)); + top.update(exchId, entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java new file mode 100644 index 0000000..c95dc42 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** */ +class CacheContinuousQueryAcknowledgeBuffer { + /** */ + private int size; + + /** */ + @GridToStringInclude + private Map<Integer, Long> updateCntrs = new HashMap<>(); + + /** */ + @GridToStringInclude + private Set<AffinityTopologyVersion> topVers = U.newHashSet(1); + + /** + * @param batch Batch. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @SuppressWarnings("unchecked") + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>onAcknowledged( + GridContinuousBatch batch) { + assert batch instanceof GridContinuousQueryBatch; + + size += ((GridContinuousQueryBatch)batch).entriesCount(); + + Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect(); + + for (CacheContinuousQueryEntry e : entries) + addEntry(e); + + return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + onAcknowledged(CacheContinuousQueryEntry e) { + size++; + + addEntry(e); + + return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + */ + private void addEntry(CacheContinuousQueryEntry e) { + topVers.add(e.topologyVersion()); + + Long cntr0 = updateCntrs.get(e.partition()); + + if (cntr0 == null || e.updateCounter() > cntr0) + updateCntrs.put(e.partition(), e.updateCounter()); + } + + /** + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> + acknowledgeOnTimeout() { + return size > 0 ? acknowledgeData() : null; + } + + /** + * @return Tuple with acknowledge information. + */ + private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() { + assert size > 0; + + Map<Integer, Long> cntrs = new HashMap<>(updateCntrs); + + IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res = + new IgniteBiTuple<>(cntrs, topVers); + + topVers = U.newHashSet(1); + + size = 0; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryAcknowledgeBuffer.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java new file mode 100644 index 0000000..f888467 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Deployable object. + */ +class CacheContinuousQueryDeployableObject implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized object. */ + private byte[] bytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryDeployableObject() { + // No-op. + } + + /** + * @param obj Object. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + protected CacheContinuousQueryDeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { + assert obj != null; + assert ctx != null; + + Class cls = U.detectClass(obj); + + clsName = cls.getName(); + + GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); + + depInfo = new GridDeploymentInfoBean(dep); + + bytes = U.marshal(ctx, obj); + } + + /** + * @param nodeId Node ID. + * @param ctx Kernal context. + * @return Deserialized object. + * @throws IgniteCheckedException In case of error. + */ + <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, + depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, bytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + bytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index bf2a691..7e3f0b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private static final byte FILTERED_ENTRY = 0b0010; /** */ + private static final byte KEEP_BINARY = 0b0100; + + /** */ private static final EventType[] EVT_TYPE_VALS = EventType.values(); /** @@ -105,11 +108,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridToStringInclude private AffinityTopologyVersion topVer; - /** Filtered events. */ - private GridLongList filteredEvts; - - /** Keep binary. */ - private boolean keepBinary; + /** */ + private long filteredCnt; /** * Required by {@link Message}. @@ -124,9 +124,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param keepBinary Keep binary flag. * @param part Partition. * @param updateCntr Update partition counter. * @param topVer Topology version if applicable. + * @param flags Flags. */ CacheContinuousQueryEntry( int cacheId, @@ -137,7 +139,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { boolean keepBinary, int part, long updateCntr, - @Nullable AffinityTopologyVersion topVer) { + @Nullable AffinityTopologyVersion topVer, + byte flags) { this.cacheId = cacheId; this.evtType = evtType; this.key = key; @@ -146,7 +149,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { this.part = part; this.updateCntr = updateCntr; this.topVer = topVer; - this.keepBinary = keepBinary; + this.flags = flags; + + if (keepBinary) + this.flags |= KEEP_BINARY; + } + + /** + * @return Flags. + */ + public byte flags() { + return flags; } /** @@ -207,26 +220,40 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** - * @return Size include this event and filtered. + * @param filteredCnt Number of entries filtered before this entry. + */ + void filteredCount(long filteredCnt) { + assert filteredCnt >= 0 : filteredCnt; + + this.filteredCnt = filteredCnt; + } + + /** + * @return Number of entries filtered before this entry. */ - public int size() { - return filteredEvts != null ? filteredEvts.size() + 1 : 1; + long filteredCount() { + return filteredCnt; } /** * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key * (avoid to huge memory consumption), otherwise {@code this}. */ - CacheContinuousQueryEntry forBackupQueue() { + CacheContinuousQueryEntry copyWithDataReset() { if (!isFiltered()) return this; - CacheContinuousQueryEntry e = new CacheContinuousQueryEntry( - cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer); - - e.flags = flags; - - return e; + return new CacheContinuousQueryEntry( + cacheId, + null, + null, + null, + null, + false, + part, + updateCntr, + topVer, + flags); } /** @@ -247,21 +274,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @return Keep binary flag. */ boolean isKeepBinary() { - return keepBinary; - } - - /** - * @param cntrs Filtered events. - */ - void filteredEvents(GridLongList cntrs) { - filteredEvts = cntrs; - } - - /** - * @return previous filtered events. - */ - long[] filteredEvents() { - return filteredEvts == null ? null : filteredEvts.array(); + return (flags & KEEP_BINARY) != 0; } /** @@ -363,7 +376,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 2: - if (!writer.writeMessage("filteredEvts", filteredEvts)) + if (!writer.writeLong("filteredCnt", filteredCnt)) return false; writer.incrementState(); @@ -375,42 +388,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 4: - if (!writer.writeBoolean("keepBinary", keepBinary)) - return false; - - writer.incrementState(); - - case 5: if (!writer.writeMessage("key", isFiltered() ? null : key)) return false; writer.incrementState(); - case 6: + case 5: if (!writer.writeMessage("newVal", isFiltered() ? null : newVal)) return false; writer.incrementState(); - case 7: + case 6: if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeInt("part", part)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 10: + case 9: if (!writer.writeLong("updateCntr", updateCntr)) return false; @@ -446,7 +453,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 2: - filteredEvts = reader.readMessage("filteredEvts"); + filteredCnt = reader.readLong("filteredCnt"); if (!reader.isLastRead()) return false; @@ -462,14 +469,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 4: - keepBinary = reader.readBoolean("keepBinary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: key = reader.readMessage("key"); if (!reader.isLastRead()) @@ -477,7 +476,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 6: + case 5: newVal = reader.readMessage("newVal"); if (!reader.isLastRead()) @@ -485,7 +484,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 7: + case 6: oldVal = reader.readMessage("oldVal"); if (!reader.isLastRead()) @@ -493,7 +492,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 8: + case 7: part = reader.readInt("part"); if (!reader.isLastRead()) @@ -501,7 +500,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 9: + case 8: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -509,7 +508,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); - case 10: + case 9: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -524,7 +523,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java new file mode 100644 index 0000000..336f650 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; + +/** + * + */ +public class CacheContinuousQueryEventBuffer { + /** */ + private static final int BUF_SIZE = + IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000); + + /** */ + private static final Object RETRY = new Object(); + + /** */ + protected final int part; + + /** */ + private AtomicReference<Batch> curBatch = new AtomicReference<>(); + + /** */ + private ConcurrentLinkedDeque8<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque8<>(); + + /** */ + private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); + + /** + * @param part Partition number. + */ + CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + + /** + * @param updateCntr Acknowledged counter. + */ + void cleanupBackupQueue(Long updateCntr) { + Iterator<CacheContinuousQueryEntry> it = backupQ.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + if (backupEntry.updateCounter() <= updateCntr) + it.remove(); + } + } + + /** + * @return Backup entries. + */ + @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() { + TreeMap<Long, CacheContinuousQueryEntry> ret = null; + + int size = backupQ.sizex(); + + if (size > 0) { + ret = new TreeMap<>(); + + for (int i = 0; i < size; i++) { + CacheContinuousQueryEntry e = backupQ.pollFirst(); + + if (e != null) + ret.put(e.updateCounter(), e); + else + break; + } + } + + Batch batch = curBatch.get(); + + if (batch != null) + ret = batch.flushCurrentEntries(ret); + + if (!pending.isEmpty()) { + if (ret == null) + ret = new TreeMap<>(); + + for (CacheContinuousQueryEntry e : pending.values()) + ret.put(e.updateCounter(), e); + } + + return ret != null ? ret.values() : null; + } + + /** + * @return Initial partition counter. + */ + protected long currentPartitionCounter() { + return 0; + } + + /** + * For test purpose only. + * + * @return Current number of filtered events. + */ + long currentFiltered() { + Batch batch = curBatch.get(); + + return batch != null ? batch.filtered : 0; + } + + /** + * @param e Entry to process. + * @param backup Backup entry flag. + * @return Collected entries to pass to listener (single entry or entries list). + */ + @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) { + return process0(e.updateCounter(), e, backup); + } + + /** + * @param backup Backup entry flag. + * @param cntr Entry counter. + * @param entry Entry. + * @return Collected entries. + */ + private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) { + assert cntr >= 0 : cntr; + + Batch batch; + Object res = null; + + for (;;) { + batch = initBatch(entry.topologyVersion()); + + if (batch == null || cntr < batch.startCntr) { + if (backup) + backupQ.add(entry); + + return entry; + } + + if (cntr <= batch.endCntr) { + res = batch.processEntry0(null, cntr, entry, backup); + + if (res == RETRY) + continue; + } + else + pending.put(cntr, entry); + + break; + } + + Batch batch0 = curBatch.get(); + + if (batch0 != batch) { + do { + batch = batch0; + + res = processPending(res, batch, backup); + + batch0 = initBatch(entry.topologyVersion()); + } + while (batch != batch0); + } + + return res; + } + + /** + * @param topVer Current event topology version. + * @return Current batch. + */ + @Nullable private Batch initBatch(AffinityTopologyVersion topVer) { + Batch batch = curBatch.get(); + + if (batch != null) + return batch; + + for (;;) { + long curCntr = currentPartitionCounter(); + + if (curCntr == -1) + return null; + + batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer); + + if (curBatch.compareAndSet(null, batch)) + return batch; + + batch = curBatch.get(); + + if (batch != null) + return batch; + } + } + + /** + * @param res Current result. + * @param batch Current batch. + * @param backup Backup entry flag. + * @return New result. + */ + @Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) { + if (pending.floorKey(batch.endCntr) != null) { + for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) { + long cntr = p.getKey(); + + assert cntr <= batch.endCntr; + + if (pending.remove(p.getKey()) != null) { + if (cntr < batch.startCntr) + res = addResult(res, p.getValue(), backup); + else + res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + } + } + } + + return res; + } + + /** + * @param res Current result. + * @param entry Entry to add. + * @param backup Backup entry flag. + * @return Updated result. + */ + @Nullable private Object addResult(@Nullable Object res, CacheContinuousQueryEntry entry, boolean backup) { + if (res == null) { + if (backup) + backupQ.add(entry); + else + res = entry; + } + else { + assert !backup; + + List<CacheContinuousQueryEntry> resList; + + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); + + resList.add((CacheContinuousQueryEntry)res); + } + else { + assert res instanceof List : res; + + resList = (List<CacheContinuousQueryEntry>)res; + } + + resList.add(entry); + + res = resList; + } + + return res; + } + + /** + * + */ + private class Batch { + /** */ + private long filtered; + + /** */ + private final long startCntr; + + /** */ + private final long endCntr; + + /** */ + private int lastProc = -1; + + /** */ + private CacheContinuousQueryEntry[] entries; + + /** */ + private final AffinityTopologyVersion topVer; + + /** + * @param filtered Number of filtered events before this batch. + * @param entries Entries array. + * @param topVer Current event topology version. + * @param startCntr Start counter. + */ + Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) { + assert startCntr >= 0; + assert filtered >= 0; + + this.startCntr = startCntr; + this.filtered = filtered; + this.entries = entries; + this.topVer = topVer; + + endCntr = startCntr + BUF_SIZE - 1; + } + + /** + * @param res Current entries. + * @return Entries to send as part of backup queue. + */ + @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries( + @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) { + if (entries == null) + return res; + + long filtered = this.filtered; + long cntr = startCntr; + + for (int i = 0; i < entries.length; i++) { + CacheContinuousQueryEntry e = entries[i]; + + CacheContinuousQueryEntry flushEntry = null; + + if (e == null) { + if (filtered != 0) { + flushEntry = filteredEntry(cntr - 1, filtered - 1); + + filtered = 0; + } + } + else { + if (e.isFiltered()) + filtered++; + else { + flushEntry = new CacheContinuousQueryEntry(e.cacheId(), + e.eventType(), + e.key(), + e.value(), + e.oldValue(), + e.isKeepBinary(), + e.partition(), + e.updateCounter(), + e.topologyVersion(), + e.flags()); + + flushEntry.filteredCount(filtered); + + filtered = 0; + } + } + + if (flushEntry != null) { + if (res == null) + res = new TreeMap<>(); + + res.put(flushEntry.updateCounter(), flushEntry); + } + + cntr++; + } + + if (filtered != 0L) { + if (res == null) + res = new TreeMap<>(); + + CacheContinuousQueryEntry flushEntry = filteredEntry(cntr - 1, filtered - 1); + + res.put(flushEntry.updateCounter(), flushEntry); + } + + return res; + } + + /** + * @param cntr Entry counter. + * @param filtered Number of entries filtered before this entry. + * @return Entry. + */ + private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) { + CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0, + null, + null, + null, + null, + false, + part, + cntr, + topVer, + (byte)0); + + e.markFiltered(); + + e.filteredCount(filtered); + + return e; + } + + /** + * @param res Current result. + * @param cntr Entry counter. + * @param entry Entry. + * @param backup Backup entry flag. + * @return New result. + */ + @SuppressWarnings("unchecked") + @Nullable private Object processEntry0( + @Nullable Object res, + long cntr, + CacheContinuousQueryEntry entry, + boolean backup) { + int pos = (int)(cntr - startCntr); + + synchronized (this) { + if (entries == null) + return RETRY; + + entry = entry.copyWithDataReset(); + + entries[pos] = entry; + + int next = lastProc + 1; + + if (next == pos) { + for (int i = next; i < entries.length; i++) { + CacheContinuousQueryEntry entry0 = entries[i]; + + if (entry0 != null) { + if (!entry0.isFiltered()) { + entry0.filteredCount(filtered); + + filtered = 0; + + res = addResult(res, entry0, backup); + } + else + filtered++; + + pos = i; + } + else + break; + } + + lastProc = pos; + + if (pos == entries.length - 1) { + Arrays.fill(entries, null); + + Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, + filtered, + entries, + entry.topologyVersion()); + + entries = null; + + assert curBatch.get() == this; + + curBatch.set(nextBatch); + } + } + else + return res; + } + + return res; + } + } +} \ No newline at end of file
