This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 71d0eb8  IGNITE-11670 Java thin client: Fixed queries inconsistency on 
failover - Fixes #6563
71d0eb8 is described below

commit 71d0eb830145100d799eba5e56ed94ead1c92f4e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon May 27 16:01:09 2019 +0300

    IGNITE-11670 Java thin client: Fixed queries inconsistency on failover - 
Fixes #6563
---
 .../ClientReconnectedException.java}               |  27 ++--
 .../internal/client/thin/ClientQueryCursor.java    |  26 +++-
 .../internal/client/thin/GenericQueryPager.java    |  30 +++--
 .../ignite/internal/client/thin/QueryPager.java    |   5 +
 .../test/java/org/apache/ignite/client/Config.java |   8 +-
 .../apache/ignite/client/LocalIgniteCluster.java   |  16 +--
 .../org/apache/ignite/client/ReliabilityTest.java  | 141 ++++++++++++---------
 7 files changed, 143 insertions(+), 110 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/client/ClientReconnectedException.java
similarity index 59%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
copy to 
modules/core/src/main/java/org/apache/ignite/client/ClientReconnectedException.java
index e985689..4153f7e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/client/ClientReconnectedException.java
@@ -15,25 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin;
-
-import java.util.Collection;
-import org.apache.ignite.client.ClientException;
+package org.apache.ignite.client;
 
 /**
- * Extracts paged data
+ * Indicates that previous connection was lost and a new connection 
established,
+ * which can lead to inconsistency of non-atomic operations.
  */
-interface QueryPager<T> extends AutoCloseable {
+public class ClientReconnectedException extends ClientException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
     /**
-     * Reads next page. Call {@link this#hasNext()} to check if there is data 
to read before calling this method.
+     * Default constructor.
      */
-    public Collection<T> next() throws ClientException;
+    public ClientReconnectedException() {
+    }
 
     /**
-     * @return {@code true} if there are more pages to read; {@code false} 
otherwise.
+     * Constructs a new exception with the specified message.
      */
-    public boolean hasNext();
-
-    /** Indicates if initial query response was received. */
-    public boolean hasFirstPage();
+    public ClientReconnectedException(String msg) {
+        super(msg);
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
index cdf94f0..82fbac8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientReconnectedException;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -39,12 +40,20 @@ class ClientQueryCursor<T> implements QueryCursor<T> {
 
     /** {@inheritDoc} */
     @Override public List<T> getAll() {
-        List<T> res = new ArrayList<>();
+        while (true) {
+            try {
+                List<T> res = new ArrayList<>();
 
-        for (T ent : this)
-            res.add(ent);
+                for (T ent : this)
+                    res.add(ent);
 
-        return res;
+                return res;
+            }
+            catch (ClientReconnectedException ex) {
+                // If we were reconnected to a new server we can retry entire 
query to failover.
+                pager.reset();
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -85,10 +94,13 @@ class ClientQueryCursor<T> implements QueryCursor<T> {
 
                     return currPageIt;
                 }
-                catch (ClientException e) {
-                    throw e;
-                }
                 catch (Exception e) {
+                    if (e instanceof ClientException)
+                        throw (ClientException)e;
+
+                    if (e instanceof ClientError)
+                        throw (ClientError)e;
+
                     throw new ClientException("Failed to retrieve query 
results", e);
                 }
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
index ce15cae..30c73cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.client.thin;
 
 import java.util.Collection;
 import java.util.function.Consumer;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientReconnectedException;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
-import org.apache.ignite.client.ClientException;
-import org.apache.ignite.client.ClientConnectionException;
 
 /**
  * Generic query pager. Override {@link this#readResult(BinaryInputStream)} to 
make it specific.
@@ -88,6 +88,15 @@ abstract class GenericQueryPager<T> implements QueryPager<T> 
{
         return hasFirstPage;
     }
 
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        hasFirstPage = false;
+
+        hasNext = true;
+
+        cursorId = null;
+    }
+
     /**
      * Override this method to read entries from the input stream. "Entries" 
means response data excluding heading
      * cursor ID and trailing "has next page" flag.
@@ -119,21 +128,18 @@ abstract class GenericQueryPager<T> implements 
QueryPager<T> {
         return res;
     }
 
-    /** Get page with failover. */
+    /** Get page. */
     private Collection<T> queryPage() throws ClientException {
         try {
             return ch.service(pageQryOp, req -> req.writeLong(cursorId), 
this::readResult);
         }
         catch (ClientServerError ex) {
-            if (ex.getCode() != ClientStatus.RESOURCE_DOES_NOT_EXIST)
-                throw ex;
-        }
-        catch (ClientConnectionException ignored) {
-        }
-
-        // Retry entire query to failover
-        hasFirstPage = false;
+            if (ex.getCode() == ClientStatus.RESOURCE_DOES_NOT_EXIST) {
+                throw new ClientReconnectedException("Client was reconnected 
in the middle of results fetch, " +
+                    "query results can be inconsistent, please retry the 
query.");
+            }
 
-        return ch.service(qryOp, qryWriter, this::readResult);
+            throw ex;
+        }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
index e985689..5b90b5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
@@ -36,4 +36,9 @@ interface QueryPager<T> extends AutoCloseable {
 
     /** Indicates if initial query response was received. */
     public boolean hasFirstPage();
+
+    /**
+     * Reset query pager.
+     */
+    public void reset();
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/client/Config.java 
b/modules/core/src/test/java/org/apache/ignite/client/Config.java
index 3e8e0e1..c279e8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/Config.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/Config.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.client;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
 import java.util.UUID;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -34,12 +32,10 @@ public class Config {
     /** Name of the cache created by default in the cluster. */
     public static final String DEFAULT_CACHE_NAME = "default";
 
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder().setShared(true);
+
     /** */
     public static IgniteConfiguration getServerConfiguration() {
-        TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
-
-        ipFinder.registerAddresses(Collections.singletonList(new 
InetSocketAddress("127.0.0.1", 47500)));
-
         TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
 
         discoverySpi.setIpFinder(ipFinder);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java 
b/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
index 18ff338..c6e1593 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
@@ -17,18 +17,16 @@
 
 package org.apache.ignite.client;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 
@@ -77,16 +75,18 @@ public class LocalIgniteCluster implements AutoCloseable {
     }
 
     /** {@inheritDoc} */
-    @Override public void close() {
+    @Override public synchronized void close() {
         srvs.forEach(Ignite::close);
 
         srvs.clear();
+
+        failedCfgs.clear();
     }
 
     /**
      * Remove one random node.
      */
-    public void failNode() {
+    public synchronized void failNode() {
         if (srvs.isEmpty())
             throw new IllegalStateException("Cannot remove node from empty 
cluster");
 
@@ -109,7 +109,7 @@ public class LocalIgniteCluster implements AutoCloseable {
     /**
      * Restore one of the failed nodes.
      */
-    public void restoreNode() {
+    public synchronized void restoreNode() {
         if (failedCfgs.isEmpty())
             throw new IllegalStateException("Cannot restore nodes in healthy 
cluster");
 
@@ -154,10 +154,6 @@ public class LocalIgniteCluster implements AutoCloseable {
     private static IgniteConfiguration getConfiguration(NodeConfiguration 
nodeCfg) {
         IgniteConfiguration igniteCfg = Config.getServerConfiguration();
 
-        
((TcpDiscoverySpi)igniteCfg.getDiscoverySpi()).getIpFinder().registerAddresses(
-            Collections.singletonList(new InetSocketAddress(HOST, 
nodeCfg.getDiscoveryPort()))
-        );
-
         igniteCfg.setClientConnectorConfiguration(new 
ClientConnectorConfiguration()
             .setHost(HOST)
             .setPort(nodeCfg.getClientPort())
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 9f933c8..646c52b 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -18,47 +18,36 @@
 package org.apache.ignite.client;
 
 import java.lang.management.ManagementFactory;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import javax.cache.Cache;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectName;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.ClientConfiguration;
-import org.apache.ignite.internal.client.thin.ClientServerError;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
-import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.mxbean.ClientProcessorMXBean;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Rule;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * High Availability tests.
  */
-public class ReliabilityTest {
-    /** Per test timeout */
-    @Rule
-    public Timeout globalTimeout = new Timeout((int) 
GridTestUtils.DFLT_TEST_TIMEOUT);
-
+public class ReliabilityTest extends GridCommonAbstractTest {
     /**
      * Thin clint failover.
      */
@@ -89,6 +78,8 @@ public class ReliabilityTest {
                 assertEquals(val, cachedVal);
             });
 
+            cache.clear();
+
             // Composite operation failover: query
             Map<Integer, String> data = IntStream.rangeClosed(1, 1000).boxed()
                 .collect(Collectors.toMap(i -> i, i -> String.format("String 
%s", i)));
@@ -142,19 +133,13 @@ public class ReliabilityTest {
              IgniteClient client = Ignition.startClient(new 
ClientConfiguration()
                  .setAddresses(cluster.clientAddresses().iterator().next()))
         ) {
-            ObjectName mbeanName = 
U.makeMBeanName(Ignition.allGrids().get(0).name(), "Clients",
-                ClientListenerProcessor.class.getSimpleName());
-
-            ClientProcessorMXBean mxBean = 
MBeanServerInvocationHandler.newProxyInstance(
-                ManagementFactory.getPlatformMBeanServer(), mbeanName, 
ClientProcessorMXBean.class,true);
-
             ClientCache<Integer, Integer> cache = client.createCache("cache");
 
             // Before fail.
             cache.put(0, 0);
 
             // Fail.
-            mxBean.dropAllConnections();
+            dropAllThinClientConnections(Ignition.allGrids().get(0));
 
             try {
                 cache.put(0, 0);
@@ -168,62 +153,94 @@ public class ReliabilityTest {
         }
     }
 
-    /** */
-    @FunctionalInterface
-    private interface Assertion {
-        /** */
-        void call() throws Exception;
+    /**
+     * Test that failover doesn't lead to silent query inconsistency.
+     */
+    @Test
+    public void testQueryConsistencyOnFailover() throws Exception {
+        int CLUSTER_SIZE = 2;
+
+        try (LocalIgniteCluster cluster = 
LocalIgniteCluster.start(CLUSTER_SIZE);
+             IgniteClient client = Ignition.startClient(new 
ClientConfiguration()
+                 .setAddresses(cluster.clientAddresses().toArray(new 
String[CLUSTER_SIZE])))
+        ) {
+            ClientCache<Integer, Integer> cache = client.createCache("cache");
+
+            cache.put(0, 0);
+            cache.put(1, 1);
+
+            Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, 
String>().setPageSize(1);
+
+            try (QueryCursor<Cache.Entry<Integer, String>> cur = 
cache.query(qry)) {
+                int cnt = 0;
+
+                for (Iterator<Cache.Entry<Integer, String>> it = 
cur.iterator(); it.hasNext(); it.next()) {
+                    cnt++;
+
+                    if (cnt == 1) {
+                        for (int i = 0; i < CLUSTER_SIZE; i++)
+                            
dropAllThinClientConnections(Ignition.allGrids().get(i));
+                    }
+                }
+
+                fail("ClientReconnectedException must be thrown");
+            }
+            catch (ClientReconnectedException expected) {
+                // No-op.
+            }
+        }
     }
 
     /**
-     * Run the assertion while Ignite nodes keep failing/recovering 10 times.
+     * Drop all thin client connections on given Ignite instance.
+     *
+     * @param ignite Ignite.
      */
-    private static void assertOnUnstableCluster(LocalIgniteCluster cluster, 
Assertion assertion) {
-        // Keep changing Ignite cluster topology by adding/removing nodes
-        final AtomicBoolean isTopStable = new AtomicBoolean(false);
+    private void dropAllThinClientConnections(Ignite ignite) throws Exception {
+        ObjectName mbeanName = U.makeMBeanName(ignite.name(), "Clients",
+            ClientListenerProcessor.class.getSimpleName());
 
-        final AtomicReference<Throwable> err = new AtomicReference<>(null);
+        ClientProcessorMXBean mxBean = 
MBeanServerInvocationHandler.newProxyInstance(
+            ManagementFactory.getPlatformMBeanServer(), mbeanName, 
ClientProcessorMXBean.class, true);
+
+        mxBean.dropAllConnections();
+    }
+
+    /**
+     * Run the closure while Ignite nodes keep failing/recovering several 
times.
+     */
+    private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable 
clo) throws Exception {
+        // Keep changing Ignite cluster topology by adding/removing nodes.
+        final AtomicBoolean stopFlag = new AtomicBoolean(false);
 
         Future<?> topChangeFut = Executors.newSingleThreadExecutor().submit(() 
-> {
-            for (int i = 0; i < 10 && err.get() == null; i++) {
-                while (cluster.size() != 1)
-                    cluster.failNode();
+            try {
+                for (int i = 0; i < 5 && !stopFlag.get(); i++) {
+                    while (cluster.size() != 1)
+                        cluster.failNode();
 
-                while (cluster.size() != cluster.getInitialSize())
-                    cluster.restoreNode();
+                    while (cluster.size() != cluster.getInitialSize())
+                        cluster.restoreNode();
+
+                    awaitPartitionMapExchange();
+                }
+            }
+            catch (InterruptedException ignore) {
+                // No-op.
             }
 
-            isTopStable.set(true);
+            stopFlag.set(true);
         });
 
-        // Use Ignite while the nodes keep failing
+        // Use Ignite while nodes keep failing.
         try {
-            while (err.get() == null && !isTopStable.get()) {
-                try {
-                    assertion.call();
-                }
-                catch (ClientServerError ex) {
-                    // TODO: fix CACHE_DOES_NOT_EXIST server error and remove 
this exception handler
-                    if (ex.getCode() != ClientStatus.CACHE_DOES_NOT_EXIST)
-                        throw ex;
-                }
-            }
-        }
-        catch (Throwable e) {
-            err.set(e);
-        }
+            while (!stopFlag.get())
+                clo.run();
 
-        try {
             topChangeFut.get();
         }
-        catch (Exception e) {
-            err.set(e);
+        finally {
+            stopFlag.set(true);
         }
-
-        Throwable ex = err.get();
-
-        String msg = ex == null ? "" : ex.getMessage();
-
-        assertNull(msg, ex);
     }
 }

Reply via email to