IGNITE-801: fixes and improvements after the first review

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d26377a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d26377a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d26377a

Branch: refs/heads/ignite-801
Commit: 7d26377ab99b3e2ce01aeb812e14f6f8d4ddd2bc
Parents: 1635748
Author: Denis Magda <[email protected]>
Authored: Fri Nov 20 10:37:07 2015 +0300
Committer: Denis Magda <[email protected]>
Committed: Fri Nov 20 10:37:07 2015 +0300

----------------------------------------------------------------------
 examples/schema-import/pom.xml                  |  13 +-
 .../processors/cache/GridCacheUtils.java        |   7 +-
 .../GridAtomicCacheQueueImpl.java               | 125 +++----------------
 ...eAbstractDataStructuresFailoverSelfTest.java |  48 ++++++-
 4 files changed, 79 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/examples/schema-import/pom.xml
----------------------------------------------------------------------
diff --git a/examples/schema-import/pom.xml b/examples/schema-import/pom.xml
index 32ce869..5bea512 100644
--- a/examples/schema-import/pom.xml
+++ b/examples/schema-import/pom.xml
@@ -20,10 +20,7 @@
 <!--
     POM file.
 -->
-<project
-    xmlns="http://maven.apache.org/POM/4.0.0";
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -91,6 +88,14 @@
                     <target>1.7</target>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 35e5803..5c4e564 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1788,7 +1788,7 @@ public class GridCacheUtils {
                     try {
                         return c.call();
                     }
-                    catch (ClusterGroupEmptyCheckedException e) {
+                    catch (ClusterGroupEmptyCheckedException | 
ClusterTopologyServerNotFoundException e) {
                         throw e;
                     }
                     catch (TransactionRollbackException e) {
@@ -1804,6 +1804,11 @@ public class GridCacheUtils {
                         if (X.hasCause(e, 
ClusterTopologyCheckedException.class)) {
                             ClusterTopologyCheckedException topErr = 
e.getCause(ClusterTopologyCheckedException.class);
 
+                            if (topErr instanceof 
ClusterGroupEmptyCheckedException || topErr instanceof
+                                ClusterTopologyServerNotFoundException)
+                                throw e;
+
+                            // IGNITE-1948: remove this check when the issue 
is fixed
                             if (topErr.retryReadyFuture() != null)
                                 topErr.retryReadyFuture().get();
                             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index 78aa9b1..b433887 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -23,8 +23,6 @@ import java.util.Map;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -57,26 +55,9 @@ public class GridAtomicCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T> {
 
             checkRemoved(idx);
 
-            int cnt = 0;
-
             GridCacheQueueItemKey key = itemKey(idx);
 
-            while (true) {
-                try {
-                    cache.getAndPut(key, item);
-
-                    break;
-                }
-                catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == GridCacheAdapter.MAX_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to put queue item, will retry 
[err=" + e + ", idx=" + idx + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            cache.getAndPut(key, item);
 
             return true;
         }
@@ -99,38 +80,18 @@ public class GridAtomicCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T> {
 
                 GridCacheQueueItemKey key = itemKey(idx);
 
-                int cnt = 0;
-
-                long stop = 0;
+                T data = (T)cache.getAndRemove(key);
 
-                while (true) {
-                    try {
-                        T data = (T)cache.getAndRemove(key);
+                if (data != null)
+                    return data;
 
-                        if (data != null)
-                            return data;
+                long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
 
-                        if (stop == 0)
-                            stop = U.currentTimeMillis() + RETRY_TIMEOUT;
+                while (U.currentTimeMillis() < stop) {
+                    data = (T)cache.getAndRemove(key);
 
-                        while (U.currentTimeMillis() < stop ) {
-                            data = (T)cache.getAndRemove(key);
-
-                            if (data != null)
-                                return data;
-                        }
-
-                        break;
-                    }
-                    catch (CachePartialUpdateCheckedException e) {
-                        if (cnt++ == GridCacheAdapter.MAX_RETRIES)
-                            throw e;
-                        else {
-                            U.warn(log, "Failed to remove queue item, will 
retry [err=" + e + ']');
-
-                            U.sleep(RETRY_DELAY);
-                        }
-                    }
+                    if (data != null)
+                        return data;
                 }
 
                 U.warn(log, "Failed to get item, will retry poll [queue=" + 
queueName + ", idx=" + idx + ']');
@@ -162,24 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T> {
                 idx++;
             }
 
-            int cnt = 0;
-
-            while (true) {
-                try {
-                    cache.putAll(putMap);
-
-                    break;
-                }
-                catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == GridCacheAdapter.MAX_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add items, will retry [err=" + 
e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            cache.putAll(putMap);
 
             return true;
         }
@@ -198,34 +142,14 @@ public class GridAtomicCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T> {
 
             GridCacheQueueItemKey key = itemKey(idx);
 
-            int cnt = 0;
+            if (cache.remove(key))
+                return;
 
-            long stop = 0;
-
-            while (true) {
-                try {
-                    if (cache.remove(key))
-                        return;
+            long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
 
-                    if (stop == 0)
-                        stop = U.currentTimeMillis() + RETRY_TIMEOUT;
-
-                    while (U.currentTimeMillis() < stop ) {
-                        if (cache.remove(key))
-                            return;
-                    }
-
-                    break;
-                }
-                catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == GridCacheAdapter.MAX_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add items, will retry [err=" + 
e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
+            while (U.currentTimeMillis() < stop) {
+                if (cache.remove(key))
+                    return;
             }
 
             U.warn(log, "Failed to remove item, [queue=" + queueName + ", 
idx=" + idx + ']');
@@ -240,21 +164,6 @@ public class GridAtomicCacheQueueImpl<T> extends 
GridCacheQueueAdapter<T> {
     @SuppressWarnings("unchecked")
     @Nullable private Long 
transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, 
Long> c)
         throws IgniteCheckedException {
-        int cnt = 0;
-
-        while (true) {
-            try {
-                return (Long)cache.invoke(queueKey, c).get();
-            }
-            catch (CachePartialUpdateCheckedException e) {
-                if (cnt++ == GridCacheAdapter.MAX_RETRIES)
-                    throw e;
-                else {
-                    U.warn(log, "Failed to update queue header, will retry 
[err=" + e + ']');
-
-                    U.sleep(RETRY_DELAY);
-                }
-            }
-        }
+        return (Long)cache.invoke(queueKey, c).get();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d26377a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 80e151c..18b82d0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache.datastructures;
 
 import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
@@ -32,22 +34,25 @@ import org.apache.ignite.IgniteAtomicReference;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -74,6 +79,9 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /** */
     private static final int TOP_CHANGE_THREAD_CNT = 3;
 
+    /** */
+    private boolean client;
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return TEST_TIMEOUT;
@@ -126,12 +134,50 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
         cfg.setCacheConfiguration(ccfg);
 
+        if (client) {
+            cfg.setClientMode(client);
+            
((TcpDiscoverySpi)(cfg.getDiscoverySpi())).setForceServerMode(true);
+        }
+
         return cfg;
     }
 
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicLongFailsWhenServersLeft() throws Exception {
+        client = true;
+
+        Ignite ignite = startGrid(gridCount());
+
+        new Timer().schedule(new TimerTask() {
+            @Override public void run() {
+                for (int i = 0; i < gridCount(); i++)
+                    stopGrid(i);
+            }
+        }, 10_000);
+
+        long stopTime = U.currentTimeMillis() + TEST_TIMEOUT / 2;
+
+        IgniteAtomicLong atomic = ignite.atomicLong(STRUCTURE_NAME, 10, true);
+
+        try {
+            while (U.currentTimeMillis() < stopTime)
+                assertEquals(10, atomic.get());
+        }
+        catch (IgniteException e) {
+            if (X.hasCause(e, ClusterTopologyServerNotFoundException.class))
+                return;
+
+            throw e;
+        }
+
+        fail();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicLongTopologyChange() throws Exception {
         try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, 
true)) {
             Ignite g = startGrid(NEW_GRID_NAME);

Reply via email to