IGNITE-426 Added cache continuos query probe. Implemented for TX.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26408c4b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26408c4b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26408c4b Branch: refs/heads/ignite-426-2-reb Commit: 26408c4b17f5a3462701ef1af98a0994c1246c9d Parents: 88ecfd4 Author: nikolay_tikhonov <[email protected]> Authored: Wed Oct 21 15:56:36 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 15:24:23 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 8 +- .../processors/cache/GridCacheMapEntry.java | 39 ++-- .../cache/GridCacheUpdateTxResult.java | 30 ++- .../GridDistributedTxRemoteAdapter.java | 22 +- .../dht/GridDhtPartitionTopologyImpl.java | 2 + .../distributed/dht/GridDhtTxFinishFuture.java | 12 +- .../distributed/dht/GridDhtTxFinishRequest.java | 89 +++++++- .../continuous/CacheContinuousQueryHandler.java | 30 +-- .../continuous/CacheContinuousQueryManager.java | 3 - .../cache/transactions/IgniteTxEntry.java | 34 ++- .../cache/transactions/IgniteTxHandler.java | 3 + .../transactions/IgniteTxLocalAdapter.java | 18 +- .../cache/transactions/IgniteTxRemoteEx.java | 7 +- .../continuous/GridContinuousProcessor.java | 3 - .../processors/cache/GridCacheTestEntryEx.java | 8 +- ...acheContinuousQueryFailoverAbstractTest.java | 209 ++++++++++++++----- ...ueryFailoverAtomicPrimaryWriteOrderTest.java | 14 +- ...inuousQueryFailoverAtomicReplicatedTest.java | 3 +- .../CacheContinuousQueryFailoverAtomicTest.java | 39 ---- ...CacheContinuousQueryClientReconnectTest.java | 187 +++++++++++++++++ .../IgniteCacheContinuousQueryClientTest.java | 157 ++++++++++++-- ...cheContinuousQueryClientTxReconnectTest.java | 32 +++ .../IgniteCacheQuerySelfTestSuite.java | 14 +- .../yardstick/cache/CacheEntryEventProbe.java | 156 ++++++++++++++ 24 files changed, 944 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index eb40d20..aa6ea18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -382,7 +382,8 @@ public interface GridCacheEntryEx { @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -419,7 +420,8 @@ public interface GridCacheEntryEx { @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updatePartIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -1016,4 +1018,4 @@ public interface GridCacheEntryEx { * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition. */ public void onUnlock(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d23bdf2..2c3bf8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1060,7 +1060,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject old; @@ -1158,6 +1159,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateIdx0 = nextPartIndex(topVer); + if (updateIdx != null && updateIdx != 0) + updateIdx0 = updateIdx; + update(val, expireTime, ttl, newVer); drReplicate(drType, val, newVer); @@ -1183,7 +1187,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName); } - if (!isNear()) + if (!isNear() && + // Ignore events on backups for one phase commit. + !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0)) cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, false); @@ -1200,7 +1206,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0)); - return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) : + return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateIdx0) : new GridCacheUpdateTxResult(false, null); } @@ -1227,7 +1233,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.transactional(); @@ -1322,8 +1329,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateIdx0 = nextPartIndex(topVer); -// if (updateIdx != null) -// updateIdx0 = updateIdx; + if (updateIdx != null && updateIdx != 0) + updateIdx0 = updateIdx; drReplicate(drType, null, newVer); @@ -1357,7 +1364,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme taskName); } - if (!isNear()) + if (!isNear() && + // Ignore events on backups for one phase commit. + !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0)) cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, true); @@ -1408,7 +1417,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else ret = old; - return new GridCacheUpdateTxResult(true, ret); + return new GridCacheUpdateTxResult(true, ret, updateIdx0); } else return new GridCacheUpdateTxResult(false, null); @@ -1976,7 +1985,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0); + updateIdx0 == null ? 0 : updateIdx0); } } else @@ -2053,7 +2062,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0); + -1); } } @@ -2101,7 +2110,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0); + updateIdx0 == null ? 0 : updateIdx); } } else @@ -3217,13 +3226,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else if (deletedUnlocked()) deletedUnlocked(false); - drReplicate(drType, val, ver); - - long updateIdx = -1; + long updateIdx = 0; if (!preload) updateIdx = nextPartIndex(topVer); + drReplicate(drType, val, ver); + if (!skipQryNtf) { cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, updateIdx, topVer); @@ -4339,4 +4348,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return "IteratorEntry [key=" + key + ']'; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java index ffda7a2..0f63777 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java @@ -32,6 +32,9 @@ public class GridCacheUpdateTxResult { @GridToStringInclude private final CacheObject oldVal; + /** Partition idx. */ + private long partIdx; + /** * Constructor. * @@ -44,6 +47,31 @@ public class GridCacheUpdateTxResult { } /** + * Constructor. + * + * @param success Success flag. + * @param oldVal Old value (if any), + */ + GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long partIdx) { + this.success = success; + this.oldVal = oldVal; + this.partIdx = partIdx; + } + + /** + * Sets partition idx. + * + * @param partIdx Partition idx. + */ + public void partIdx(long partIdx) { + this.partIdx = partIdx; + } + + public long partIdx() { + return partIdx; + } + + /** * @return Success flag. */ public boolean success() { @@ -61,4 +89,4 @@ public class GridCacheUpdateTxResult { @Override public String toString() { return S.toString(GridCacheUpdateTxResult.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index fcbf58d..c972f43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -281,6 +281,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } + /** {@inheritDoc} */ + @Override public void setPartitionUpdateIdx(long[] idxs) { + if (writeMap != null && !writeMap.isEmpty() && idxs != null && idxs.length > 0) { + int i = 0; + + for (IgniteTxEntry txEntry : writeMap.values()) { + txEntry.partIdx(idxs[i]); + + ++i; + } + } + } + /** * Adds completed versions to an entry. * @@ -591,7 +604,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter replicate ? DR_BACKUP : DR_NONE, near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + txEntry.partIdx()); else { cached.innerSet(this, eventNodeId(), @@ -609,7 +623,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + txEntry.partIdx()); // Keep near entry up to date. if (nearCached != null) { @@ -638,7 +653,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + txEntry.partIdx()); // Keep near entry up to date. if (nearCached != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 098a60d..4616b17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -302,6 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); + cntrMap.clear(); + // If this is the oldest node. if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) { if (node2part == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 992bd66..96459ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@ -376,6 +377,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur add(fut); // Append new future. + Collection<Long> updateIdxs = F.transform(dhtMapping.entries(), new C1<IgniteTxEntry, Long>() { + @Override public Long apply(IgniteTxEntry entry) { + assert entry != null; + + return entry.partIdx(); + } + }); + GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), futId, @@ -399,7 +408,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.size(), tx.subjectId(), tx.taskNameHash(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + updateIdxs); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index caa0aa5..18ac921 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -66,6 +67,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** Check comitted flag. */ private boolean checkCommitted; + /** Partition update counter. */ + @GridToStringInclude + @GridDirectCollection(Long.class) + private GridLongList partUpdateCnt; + /** One phase commit write version. */ private GridCacheVersion writeVer; @@ -163,6 +169,74 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } /** + * @param nearNodeId Near node ID. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param topVer Topology version. + * @param xidVer Transaction ID. + * @param threadId Thread ID. + * @param commitVer Commit version. + * @param isolation Transaction isolation. + * @param commit Commit flag. + * @param invalidate Invalidate flag. + * @param sys System flag. + * @param sysInvalidate System invalidation flag. + * @param syncCommit Synchronous commit flag. + * @param syncRollback Synchronous rollback flag. + * @param baseVer Base version. + * @param committedVers Committed versions. + * @param rolledbackVers Rolled back versions. + * @param pendingVers Pending versions. + * @param txSize Expected transaction size. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param updateIdxs Partition update idxs. + */ + public GridDhtTxFinishRequest( + UUID nearNodeId, + IgniteUuid futId, + IgniteUuid miniId, + @NotNull AffinityTopologyVersion topVer, + GridCacheVersion xidVer, + GridCacheVersion commitVer, + long threadId, + TransactionIsolation isolation, + boolean commit, + boolean invalidate, + boolean sys, + byte plc, + boolean sysInvalidate, + boolean syncCommit, + boolean syncRollback, + GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, + Collection<GridCacheVersion> pendingVers, + int txSize, + @Nullable UUID subjId, + int taskNameHash, + Collection<Long> updateIdxs + ) { + this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc, + sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize, + subjId, taskNameHash); + + if (updateIdxs != null && !updateIdxs.isEmpty()) { + partUpdateCnt = new GridLongList(updateIdxs.size()); + + for (Long idx : updateIdxs) + partUpdateCnt.add(idx); + } + } + + /** + * @return Partition update counters. + */ + public GridLongList partUpdateCounters(){ + return partUpdateCnt; + } + + /** * @return Mini ID. */ public IgniteUuid miniId() { @@ -329,6 +403,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); + case 28: + if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) + return false; + + writer.incrementState(); } return true; @@ -429,6 +508,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); + case 28: + partUpdateCnt = reader.readMessage("partUpdateCnt"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtTxFinishRequest.class); @@ -441,6 +528,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 28; + return 29; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index c537854..14c1b8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -316,10 +316,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } else { - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - - if (!skipPrimaryCheck) - sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + if (!entry.filtered()) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); } } else { @@ -560,13 +558,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { rec = oldRec; } - Collection<CacheContinuousQueryEntry> entries = rec.collectEntries(e); - - if (CacheContinuousQueryManager.SUPER_DEBUG) - ctx.log(getClass()).error("Fire the following event for partition : " + e.partition() + - " Entries: " + Arrays.toString(entries.toArray())); - - return entries; + return rec.collectEntries(e); } /** @@ -608,9 +600,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { synchronized (pendingEnts) { // Received first event. if (lastFiredEvt == INIT_VALUE) { - if (CacheContinuousQueryManager.SUPER_DEBUG) - log.error("First event. " + entry); - lastFiredEvt = entry.updateIndex(); firedEvents.add(new T2<>(lastFiredEvt, entry)); @@ -624,29 +613,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { lastFiredEvt = 1; - if (CacheContinuousQueryManager.SUPER_DEBUG) - log.error("Lost partition. Start from 1. Entry: " + entry); - firedEvents.add(new T2<>(lastFiredEvt, entry)); return F.asList(entry); } // Check duplicate. - if (entry.updateIndex() > lastFiredEvt) { - if (CacheContinuousQueryManager.SUPER_DEBUG) - log.error("Put message to pending queue. Counter value: " + lastFiredEvt + " Entry: " + entry); - + if (entry.updateIndex() > lastFiredEvt) pendingEnts.put(entry.updateIndex(), entry); - } else { if (log.isDebugEnabled()) log.debug("Skip duplicate continuous query message: " + entry); - if (CacheContinuousQueryManager.SUPER_DEBUG) - log.error("Received duplicate. Counter value: " + lastFiredEvt + " Entry: " + entry - + ", Proceed message " + Arrays.toString(firedEvents.toArray())); - return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 16b40c7..65bb670 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -65,7 +65,6 @@ import static javax.cache.event.EventType.EXPIRED; import static javax.cache.event.EventType.REMOVED; import static javax.cache.event.EventType.UPDATED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; /** @@ -87,8 +86,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** */ private static final long BACKUP_ACK_FREQ = 5000; - public static final boolean SUPER_DEBUG = false; - /** Listeners. */ private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 9eb2808..f5cf501 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -181,6 +181,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { */ private byte flags; + /** Partition update index. */ + private long partIdx; + /** */ private GridCacheVersion serReadVer; @@ -373,6 +376,22 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** + * Sets partition index. + * + * @param partIdx Partition index. + */ + public void partIdx(long partIdx) { + this.partIdx = partIdx; + } + + /** + * @return Partition index. + */ + public long partIdx() { + return partIdx; + } + + /** * @param val Value to set. */ void setAndMarkValid(CacheObject val) { @@ -934,6 +953,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); + case 12: + if (!writer.writeLong("partIdx", partIdx)) + return false; + + writer.incrementState(); } return true; @@ -1043,6 +1067,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); + case 12: + partIdx = reader.readLong("partIdx"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(IgniteTxEntry.class); @@ -1055,7 +1087,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 13; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index d9786a8..631f9f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -984,6 +984,9 @@ public class IgniteTxHandler { // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); + tx.setPartitionUpdateIdx( + req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); + tx.commit(); } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 82e5f2a..6f7ae27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1025,7 +1025,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); + + if (updRes.success()) + txEntry.partIdx(updRes.partIdx()); if (nearCached != null && updRes.success()) { nearCached.innerSet( @@ -1045,7 +1049,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); } } else if (op == DELETE) { @@ -1063,7 +1068,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); + + if (updRes.success()) + txEntry.partIdx(updRes.partIdx()); if (nearCached != null && updRes.success()) { nearCached.innerRemove( @@ -1080,7 +1089,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + null); } } else if (op == RELOAD) { http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index 9660e4e..845f4f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -43,4 +43,9 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx { * @return {@code True} if entry was found. */ public boolean setWriteValue(IgniteTxEntry e); -} \ No newline at end of file + + /** + * @param idxs Partition update indexes. + */ + public void setPartitionUpdateIdx(long[] idxs); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index c7676d2..3ed186e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -37,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; @@ -74,9 +73,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 9ee6fe7..110b9a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -479,7 +479,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Nullable GridCacheVersion drVer, UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer) + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateIdx) throws IgniteCheckedException, GridCacheEntryRemovedException { return new GridCacheUpdateTxResult(true, rawPut(val, ttl)); } @@ -553,7 +554,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Nullable GridCacheVersion drVer, UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updatePartIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException { obsoleteVer = ver; @@ -896,4 +898,4 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Override public void onUnlock() { // No-op. } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index ca754af..6029761 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; +import javax.cache.CacheException; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; @@ -55,6 +56,7 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -84,6 +86,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -297,7 +300,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo IgniteCache<Object, Object> srvCache = igniteSrv.cache(null); - List<Integer> keys = testKeys(srvCache, 1); + List<Integer> keys = testKeys(srvCache, 3); int keyCnt = keys.size(); @@ -371,6 +374,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ public void testLeftPrimaryAndBackupNodes() throws Exception { + if (cacheMode() == REPLICATED) + return; + this.backups = 1; final int SRV_NODES = 3; @@ -485,7 +491,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */) - 1 /** Primary node */ - backups; } - }, 10000L); + }, 5000L); for (; keyIter < keys.size(); keyIter++) { int key = keys.get(keyIter); @@ -560,7 +566,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); - for (int i = 0; i < SRV_NODES - 1; i++) { + for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) { log.info("Stop iteration: " + i); TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); @@ -654,7 +660,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception { - this.backups = backups; + this.backups = atomicityMode() == CacheAtomicityMode.ATOMIC ? backups : + backups < 2 ? 2 : backups; final int SRV_NODES = 4; @@ -668,9 +675,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); - if (cacheMode() != REPLICATED) - assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups()); - Affinity<Object> aff = qryClient.affinity(null); CacheEventListener1 lsnr = new CacheEventListener1(false); @@ -687,7 +691,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); - for (int i = 0; i < SRV_NODES - 1; i++) { + for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) { log.info("Stop iteration: " + i); TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); @@ -709,6 +713,39 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo T2<Object, Object> t = updates.get(key); + if (updateFromClient) { + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { + try (Transaction tx = qryClient.transactions().txStart()) { + qryClientCache.put(key, key); + + tx.commit(); + } + catch (CacheException | ClusterTopologyException e) { + log.warning("Failed put. [Key=" + key + ", val=" + key + "]"); + + continue; + } + } + else + qryClientCache.put(key, key); + } + else { + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { + try (Transaction tx = ignite.transactions().txStart()) { + cache.put(key, key); + + tx.commit(); + } + catch (CacheException | ClusterTopologyException e) { + log.warning("Failed put. [Key=" + key + ", val=" + key + "]"); + + continue; + } + } + else + cache.put(key, key); + } + if (t == null) { updates.put(key, new T2<>((Object)key, null)); @@ -720,11 +757,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); } - if (updateFromClient) - qryClientCache.put(key, key); - else - cache.put(key, key); - if (first) { spi.skipMsg = true; @@ -747,7 +779,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo checkEvents(expEvts, lsnr); } - for (int i = 0; i < SRV_NODES - 1; i++) { + for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) { log.info("Start iteration: " + i); Ignite ignite = startGrid(i); @@ -782,7 +814,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo cache.put(key, key); } - if (!latch.await(5, SECONDS)) { + if (!latch.await(10, SECONDS)) { Set<Integer> keys0 = new HashSet<>(keys); keys0.removeAll(lsnr.keys); @@ -824,7 +856,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, boolean lostAllow) throws Exception { - boolean b = GridTestUtils.waitForCondition(new PA() { + GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return expEvts.size() == lsnr.size(); } @@ -910,7 +942,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo for (T3<Object, Object, Object> e : lostEvents) log.error("Lost event: " + e); - assertTrue("Lose events, see log for details.", false); + fail("Lose events, see log for details."); } log.error("Lost event cnt: " + lostEvents.size()); @@ -1155,17 +1187,19 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ public void testFailover() throws Exception { + this.backups = 2; + final int SRV_NODES = 4; startGridsMultiThreaded(SRV_NODES); client = true; - Ignite qryClient = startGrid(SRV_NODES); + final Ignite qryCln = startGrid(SRV_NODES); client = false; - IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); + final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null); final CacheEventListener2 lsnr = new CacheEventListener2(); @@ -1173,7 +1207,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo qry.setLocalListener(lsnr); - QueryCursor<?> cur = qryClientCache.query(qry); + QueryCursor<?> cur = qryClnCache.query(qry); final AtomicBoolean stop = new AtomicBoolean(); @@ -1194,7 +1228,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo log.info("Stop node: " + idx); - stopGrid(idx); + try { + stopGrid(idx); + } + catch (Exception e) { + log.warning("Failed to stop nodes.", e); + } CountDownLatch latch = new CountDownLatch(1); @@ -1216,9 +1255,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); try { - long stopTime = System.currentTimeMillis() + 1 * 60_000; + long stopTime = System.currentTimeMillis() + 60_000; - final int PARTS = qryClient.affinity(null).partitions(); + final int PARTS = qryCln.affinity(null).partitions(); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -1234,17 +1273,51 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo val = val + 1; if (processorPut && prevVal != null) { - qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { - @Override public Void process(MutableEntry<Object, Object> entry, - Object... arguments) throws EntryProcessorException { - entry.setValue(arguments[0]); + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { + try (Transaction tx = qryCln.transactions().txStart()) { + qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { + @Override public Void process(MutableEntry<Object, Object> e, + Object... arg) throws EntryProcessorException { + e.setValue(arg[0]); + + return null; + } + }, val); + + tx.commit(); + } + catch (CacheException | ClusterTopologyException e) { + log.warning("Failed put. [Key=" + key + ", val=" + val + "]"); - return null; + continue; } - }, val); + } + else + qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { + @Override public Void process(MutableEntry<Object, Object> e, + Object... arg) throws EntryProcessorException { + e.setValue(arg[0]); + + return null; + } + }, val); + } + else { + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { + try (Transaction tx = qryCln.transactions().txStart()) { + qryClnCache.put(key, val); + + tx.commit(); + } + catch (CacheException | ClusterTopologyException e) { + log.warning("Failed put. [Key=" + key + ", val=" + val + "]"); + + continue; + } + } + else + qryClnCache.put(key, val); } - else - qryClientCache.put(key, val); processorPut = !processorPut; @@ -1306,11 +1379,14 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo restartFut.get(); - boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return checkEvents(false, expEvts, lsnr); - } - }, 10_000); + boolean check = true; + + if (!expEvts.isEmpty()) + check = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return checkEvents(false, expEvts, lsnr); + } + }, 10_000); if (!check) assertTrue(checkEvents(true, expEvts, lsnr)); @@ -1324,6 +1400,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ public void testFailoverFilter() throws Exception { + this.backups = 2; + final int SRV_NODES = 4; startGridsMultiThreaded(SRV_NODES); @@ -1385,7 +1463,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); try { - long stopTime = System.currentTimeMillis() + 1 * 60_000; + long stopTime = System.currentTimeMillis() + 60_000; final int PARTS = qryClient.affinity(null).partitions(); @@ -1510,15 +1588,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ - public void testFailoverStartStopOneBackup() throws Exception { - failoverStartStopFilter(1); + public void testFailoverStartStopBackup() throws Exception { + failoverStartStopFilter(atomicityMode() == CacheAtomicityMode.ATOMIC ? 1 : 2); } /** * @throws Exception If failed. */ - public void _testStartStop() throws Exception { - this.backups = 0; + public void testStartStop() throws Exception { + this.backups = 2; final int SRV_NODES = 4; @@ -1532,6 +1610,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo IgniteCache<Object, Object> qryClnCache = qryClient.cache(null); + Affinity<Object> aff = qryClient.affinity(null); + final CacheEventListener2 lsnr = new CacheEventListener2(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -1542,18 +1622,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo QueryCursor<?> cur = qryClnCache.query(qry); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20; i++) { final int idx = i % (SRV_NODES - 1); log.info("Stop node: " + idx); stopGrid(idx); - Thread.sleep(200); + awaitPartitionMapExchange(); List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>(); - for (int j = 0; j < 10; j++) { + for (int j = 0; j < aff.partitions(); j++) { Integer oldVal = (Integer)qryClnCache.get(j); qryClnCache.put(j, i); @@ -1646,7 +1726,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>(); try { - long stopTime = System.currentTimeMillis() + 60_000; + long stopTime = System.currentTimeMillis() + 10_000; // Start new filter each 5 sec. long startFilterTime = System.currentTimeMillis() + 5_000; @@ -1785,13 +1865,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo dinLsnr.evts.clear(); dinLsnr.vals.clear(); - - dinQry.close(); } List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>(); - for (int i = 0; i < 1024; i++) { + for (int i = 0; i < qryClient.affinity(null).partitions(); i++) { Integer oldVal = (Integer)qryClnCache.get(i); qryClnCache.put(i, i); @@ -1801,12 +1879,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo checkEvents(new ArrayList<>(afterRestEvents), lsnr, false); - //checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false); - cur.close(); - if (dinQry != null) + if (dinQry != null) { + checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false); + dinQry.close(); + } assertFalse("Unexpected error during test, see log for details.", err); } @@ -1815,6 +1894,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ public void testMultiThreaded() throws Exception { + this.backups = 2; + final int SRV_NODES = 3; startGridsMultiThreaded(SRV_NODES); @@ -1957,8 +2038,24 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo T2<Integer, Integer> expEvt = exp.get(i); CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i); - assertEquals(key, rcvdEvt.getKey()); - assertEquals(expEvt.get1(), rcvdEvt.getValue()); + if (pass) { + assertEquals(key, rcvdEvt.getKey()); + assertEquals(expEvt.get1(), rcvdEvt.getValue()); + } + else { + if (!key.equals(rcvdEvt.getKey()) || !expEvt.get1().equals(rcvdEvt.getValue())) + log.warning("Missed events. [key=" + key + ", actKey=" + rcvdEvt.getKey() + + ", expVal=" + expEvt.get1() + ", actVal=" + rcvdEvt.getValue() + "]"); + } + } + + if (!pass) { + for (int i = cnt; i < exp.size(); i++) { + T2<Integer, Integer> val = exp.get(i); + + log.warning("Missed events. [key=" + key + ", expVal=" + val.get1() + + ", prevVal=" + val.get2() + "]"); + } } } } @@ -2168,7 +2265,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo if (msg0 instanceof GridContinuousMessage) { if (skipMsg) { - log.info("Skip continuous message: " + msg0); + if (log.isDebugEnabled()) + log.debug("Skip continuous message: " + msg0); return; } @@ -2176,7 +2274,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo AtomicBoolean sndFirstOnly = this.sndFirstOnly; if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) { - log.info("Skip continuous message: " + msg0); + if (log.isDebugEnabled()) + log.debug("Skip continuous message: " + msg0); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java index 4ddcf0d..8bd7ea7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java @@ -18,15 +18,27 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; /** * */ -public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest { +public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAbstractTest { /** {@inheritDoc} */ @Override protected CacheAtomicWriteOrderMode writeOrderMode() { return PRIMARY; } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java index 8fc58d3..db5b8cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java @@ -26,7 +26,8 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; /** * */ -public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest { +public class CacheContinuousQueryFailoverAtomicReplicatedTest + extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest { /** {@inheritDoc} */ @Override protected CacheMode cacheMode() { return REPLICATED; http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java deleted file mode 100644 index fb50387..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; - -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; - -/** - * - */ -public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return PARTITIONED; - } - - /** {@inheritDoc} */ - @Override protected CacheAtomicityMode atomicityMode() { - return ATOMIC; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java new file mode 100644 index 0000000..560f2e0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteClientReconnectAbstractTest; +import org.apache.ignite.resources.LoggerResource; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheContinuousQueryClientReconnectTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(atomicMode()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @return Atomic mode. + */ + protected CacheAtomicityMode atomicMode() { + return ATOMIC; + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClient() throws Exception { + Ignite client = grid(serverCount()); + + Ignite srv = clientRouter(client); + + assertTrue(client.cluster().localNode().isClient()); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + IgniteCache<Object, Object> clnCache = client.cache(null); + + QueryCursor<?> cur = clnCache.query(qry); + + int keyCnt = 100; + + for (int i = 0; i < 30; i++) { + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + reconnectClientNode(client, srv, null); + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + } + + cur.close(); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClientAndLeftRouter() throws Exception { + Ignite client = grid(serverCount()); + + final Ignite srv = clientRouter(client); + + final String clnRouterName = srv.name(); + + assertTrue(client.cluster().localNode().isClient()); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + IgniteCache<Object, Object> clnCache = client.cache(null); + + QueryCursor<?> cur = clnCache.query(qry); + + int keyCnt = 100; + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + stopGrid(clnRouterName); + } + }); + + assertFalse("Client connected to the same server node.", clnRouterName.equals(clientRouter(client).name())); + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** */ + private volatile CountDownLatch latch = new CountDownLatch(1); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { + log.info("Received cache event: " + evt); + + latch.countDown(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java index 1afeb05..534f298 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -27,11 +27,13 @@ import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static java.util.concurrent.TimeUnit.SECONDS; @@ -83,11 +85,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest client = true; - Ignite clientNode = startGrid(3); + final int CLIENT_ID = 3; + + Ignite clientNode = startGrid(CLIENT_ID); client = false; - CacheEventListener lsnr = new CacheEventListener(); + final CacheEventListener lsnr = new CacheEventListener(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -95,27 +99,154 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest QueryCursor<?> cur = clientNode.cache(null).query(qry); - Ignite joined1 = startGrid(4); + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined1 = startGrid(4); + + IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + + joinedCache1.put(primaryKey(joinedCache1), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined2 = startGrid(5); + + IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); - IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + joinedCache2.put(primaryKey(joinedCache2), 2); - joinedCache1.put(primaryKey(joinedCache1), 1); + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); - assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + stopGrid(4); + + stopGrid(5); + } cur.close(); + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoinsRestartQuery() throws Exception { + startGrids(2); + + client = true; + + final int CLIENT_ID = 3; + + Ignite clientNode = startGrid(CLIENT_ID); + + client = false; + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clientNode.cache(null).query(qry); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined1 = startGrid(4); + + IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + + joinedCache1.put(primaryKey(joinedCache1), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); - lsnr.latch = new CountDownLatch(1); + lsnr.latch = new CountDownLatch(1); - Ignite joined2 = startGrid(5); + Ignite joined2 = startGrid(5); - IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); + IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); - joinedCache2.put(primaryKey(joinedCache2), 2); + joinedCache2.put(primaryKey(joinedCache2), 2); - U.sleep(1000); + assertFalse("Unexpected event received.", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return 1 != lsnr.latch.getCount(); + } + }, 1000)); - assertEquals("Unexpected event received.", 1, lsnr.latch.getCount()); + stopGrid(4); + + stopGrid(5); + } + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeft() throws Exception { + startGrids(3); + + client = true; + + final int CLIENT_ID = 3; + + Ignite clnNode = startGrid(CLIENT_ID); + + client = false; + + IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache = + new IgniteOutClosure<IgniteCache<Integer, Integer>>() { + int cnt = 0; + + @Override public IgniteCache<Integer, Integer> apply() { + ++cnt; + + return grid(CLIENT_ID).cache(null); + } + }; + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clnNode.cache(null).query(qry); + + boolean first = true; + + int keyCnt = 1; + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + if (first) + first = false; + else { + for (int srv = 0; srv < CLIENT_ID - 1; srv++) + startGrid(srv); + } + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + rndCache.apply().put(key, key); + + assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(), + lsnr.latch.await(10, SECONDS)); + + for (int srv = 0; srv < CLIENT_ID - 1; srv++) + stopGrid(srv); + } + + cur.close(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java new file mode 100644 index 0000000..a10ebc9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class IgniteCacheContinuousQueryClientTxReconnectTest extends IgniteCacheContinuousQueryClientReconnectTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicMode() { + return TRANSACTIONAL; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 6cb1a52..91dc388 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -65,6 +65,10 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest; @@ -77,7 +81,9 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; +import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest; @@ -160,8 +166,14 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); - suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
