IGNITE-801: fixes and improvements after the first review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d26377a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d26377a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d26377a Branch: refs/heads/ignite-801 Commit: 7d26377ab99b3e2ce01aeb812e14f6f8d4ddd2bc Parents: 1635748 Author: Denis Magda <[email protected]> Authored: Fri Nov 20 10:37:07 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Fri Nov 20 10:37:07 2015 +0300 ---------------------------------------------------------------------- examples/schema-import/pom.xml | 13 +- .../processors/cache/GridCacheUtils.java | 7 +- .../GridAtomicCacheQueueImpl.java | 125 +++---------------- ...eAbstractDataStructuresFailoverSelfTest.java | 48 ++++++- 4 files changed, 79 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/examples/schema-import/pom.xml ---------------------------------------------------------------------- diff --git a/examples/schema-import/pom.xml b/examples/schema-import/pom.xml index 32ce869..5bea512 100644 --- a/examples/schema-import/pom.xml +++ b/examples/schema-import/pom.xml @@ -20,10 +20,7 @@ <!-- POM file. --> -<project - xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -91,6 +88,14 @@ <target>1.7</target> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 35e5803..5c4e564 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1788,7 +1788,7 @@ public class GridCacheUtils { try { return c.call(); } - catch (ClusterGroupEmptyCheckedException e) { + catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) { throw e; } catch (TransactionRollbackException e) { @@ -1804,6 +1804,11 @@ public class GridCacheUtils { if (X.hasCause(e, ClusterTopologyCheckedException.class)) { ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof + ClusterTopologyServerNotFoundException) + throw e; + + // IGNITE-1948: remove this check when the issue is fixed if (topErr.retryReadyFuture() != null) topErr.retryReadyFuture().get(); else http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java index 78aa9b1..b433887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java @@ -23,8 +23,6 @@ import java.util.Map; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; @@ -57,26 +55,9 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { checkRemoved(idx); - int cnt = 0; - GridCacheQueueItemKey key = itemKey(idx); - while (true) { - try { - cache.getAndPut(key, item); - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == GridCacheAdapter.MAX_RETRIES) - throw e; - else { - U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + cache.getAndPut(key, item); return true; } @@ -99,38 +80,18 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { GridCacheQueueItemKey key = itemKey(idx); - int cnt = 0; - - long stop = 0; + T data = (T)cache.getAndRemove(key); - while (true) { - try { - T data = (T)cache.getAndRemove(key); + if (data != null) + return data; - if (data != null) - return data; + long stop = U.currentTimeMillis() + RETRY_TIMEOUT; - if (stop == 0) - stop = U.currentTimeMillis() + RETRY_TIMEOUT; + while (U.currentTimeMillis() < stop) { + data = (T)cache.getAndRemove(key); - while (U.currentTimeMillis() < stop ) { - data = (T)cache.getAndRemove(key); - - if (data != null) - return data; - } - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == GridCacheAdapter.MAX_RETRIES) - throw e; - else { - U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } + if (data != null) + return data; } U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']'); @@ -162,24 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { idx++; } - int cnt = 0; - - while (true) { - try { - cache.putAll(putMap); - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == GridCacheAdapter.MAX_RETRIES) - throw e; - else { - U.warn(log, "Failed to add items, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + cache.putAll(putMap); return true; } @@ -198,34 +142,14 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { GridCacheQueueItemKey key = itemKey(idx); - int cnt = 0; + if (cache.remove(key)) + return; - long stop = 0; - - while (true) { - try { - if (cache.remove(key)) - return; + long stop = U.currentTimeMillis() + RETRY_TIMEOUT; - if (stop == 0) - stop = U.currentTimeMillis() + RETRY_TIMEOUT; - - while (U.currentTimeMillis() < stop ) { - if (cache.remove(key)) - return; - } - - break; - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == GridCacheAdapter.MAX_RETRIES) - throw e; - else { - U.warn(log, "Failed to add items, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } + while (U.currentTimeMillis() < stop) { + if (cache.remove(key)) + return; } U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']'); @@ -240,21 +164,6 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { @SuppressWarnings("unchecked") @Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c) throws IgniteCheckedException { - int cnt = 0; - - while (true) { - try { - return (Long)cache.invoke(queueKey, c).get(); - } - catch (CachePartialUpdateCheckedException e) { - if (cnt++ == GridCacheAdapter.MAX_RETRIES) - throw e; - else { - U.warn(log, "Failed to update queue header, will retry [err=" + e + ']'); - - U.sleep(RETRY_DELAY); - } - } - } + return (Long)cache.invoke(queueKey, c).get(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 80e151c..18b82d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.Collection; +import java.util.Timer; +import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; @@ -32,22 +34,25 @@ import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.typedef.CA; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -74,6 +79,9 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** */ private static final int TOP_CHANGE_THREAD_CNT = 3; + /** */ + private boolean client; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return TEST_TIMEOUT; @@ -126,12 +134,50 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig cfg.setCacheConfiguration(ccfg); + if (client) { + cfg.setClientMode(client); + ((TcpDiscoverySpi)(cfg.getDiscoverySpi())).setForceServerMode(true); + } + return cfg; } /** * @throws Exception If failed. */ + public void testAtomicLongFailsWhenServersLeft() throws Exception { + client = true; + + Ignite ignite = startGrid(gridCount()); + + new Timer().schedule(new TimerTask() { + @Override public void run() { + for (int i = 0; i < gridCount(); i++) + stopGrid(i); + } + }, 10_000); + + long stopTime = U.currentTimeMillis() + TEST_TIMEOUT / 2; + + IgniteAtomicLong atomic = ignite.atomicLong(STRUCTURE_NAME, 10, true); + + try { + while (U.currentTimeMillis() < stopTime) + assertEquals(10, atomic.get()); + } + catch (IgniteException e) { + if (X.hasCause(e, ClusterTopologyServerNotFoundException.class)) + return; + + throw e; + } + + fail(); + } + + /** + * @throws Exception If failed. + */ public void testAtomicLongTopologyChange() throws Exception { try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) { Ignite g = startGrid(NEW_GRID_NAME);
