ignite-801: fixed IgniteQueue failover tests
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97c859d0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97c859d0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97c859d0 Branch: refs/heads/ignite-801 Commit: 97c859d0af699b0f7c878126911f6d061ce825c0 Parents: 5ea234c Author: Denis Magda <[email protected]> Authored: Tue Oct 27 16:32:13 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Tue Oct 27 16:32:13 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 12 ++++++++ .../CacheDataStructuresManager.java | 1 + .../dht/GridPartitionedGetFuture.java | 3 +- .../distributed/near/GridNearGetFuture.java | 2 +- .../datastructures/DataStructuresProcessor.java | 1 + .../datastructures/GridCacheQueueAdapter.java | 32 ++++++++++++-------- .../ignite/spi/discovery/DiscoverySpi.java | 9 +++++- ...eAbstractDataStructuresFailoverSelfTest.java | 10 +++++- 8 files changed, 53 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 6aba211..fb2efe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -102,6 +102,7 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; +import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoverySpi; @@ -1767,6 +1768,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Failure detection timeout used by discovery SPI. If the timeout is disabled then a value of the + * network timeout is returned. + * + * @return . + */ + public long failureDetectionTimeout() { + return getSpi().failureDetectionTimeoutEnabled() ? ctx.config().getFailureDetectionTimeout() : + ctx.config().getNetworkTimeout(); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 930921b..ac90efc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.datastructures.GridTransactionalCac import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index a68e834..e0e8f6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -710,8 +710,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), + cctx.discovery().failureDetectionTimeout(), updTopVer, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index eca2f71..1fb4c95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -845,7 +845,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), + cctx.discovery().failureDetectionTimeout(), updTopVer, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 0f2c7a1..11f6134 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 0e4aebc..0843eac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -169,14 +169,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp @SuppressWarnings("unchecked") @Nullable @Override public T peek() throws IgniteException { try { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + while (true) { + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - if (hdr.empty()) - return null; + if (hdr.empty()) + return null; + + T val = (T)cache.get(itemKey(hdr.head())); - return (T)cache.get(itemKey(hdr.head())); + if (val == null) + // Header might have been polled. Retry. + continue; + + return val; + } } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -416,8 +424,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp long startIdx, long endIdx, int batchSize) - throws IgniteCheckedException - { + throws IgniteCheckedException { Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10); for (long idx = startIdx; idx < endIdx; idx++) { @@ -435,8 +442,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp } /** - * Checks result of closure modifying queue header, throws {@link IllegalStateException} - * if queue was removed. + * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed. * * @param idx Result of closure execution. */ @@ -529,7 +535,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp */ protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException; - /** * @param idx Item index. * @return Item key. @@ -816,7 +821,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp } next++; - } while (next != hdr.tail()); + } + while (next != hdr.tail()); GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), hdr.capacity(), @@ -1036,7 +1042,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp if (o == null || getClass() != o.getClass()) return false; - GridCacheQueueAdapter that = (GridCacheQueueAdapter) o; + GridCacheQueueAdapter that = (GridCacheQueueAdapter)o; return id.equals(that.id); @@ -1051,4 +1057,4 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp @Override public String toString() { return S.toString(GridCacheQueueAdapter.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 612c1f1..baa26d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -164,4 +164,11 @@ public interface DiscoverySpi extends IgniteSpi { * @throws IllegalStateException If discovery SPI has not started. */ public boolean isClientMode() throws IllegalStateException; -} \ No newline at end of file + + /** + * Checks whether failure detection timeout is enabled for the discovery SPI. + * + * @return {@code true} if enabled, {@code false} otherwise. + */ + public boolean failureDetectionTimeoutEnabled(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/97c859d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 185460c..6e91107 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -449,7 +449,15 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { @Override public Object apply(Ignite ignite) { - assert ignite.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0; + IgniteQueue<Integer> queue = ignite.queue(STRUCTURE_NAME, 0, null); + + assertNotNull(queue); + + Integer val = queue.peek(); + + assertNotNull(val); + + assert val > 0; return null; }
