This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 71d0eb8 IGNITE-11670 Java thin client: Fixed queries inconsistency on
failover - Fixes #6563
71d0eb8 is described below
commit 71d0eb830145100d799eba5e56ed94ead1c92f4e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon May 27 16:01:09 2019 +0300
IGNITE-11670 Java thin client: Fixed queries inconsistency on failover -
Fixes #6563
---
.../ClientReconnectedException.java} | 27 ++--
.../internal/client/thin/ClientQueryCursor.java | 26 +++-
.../internal/client/thin/GenericQueryPager.java | 30 +++--
.../ignite/internal/client/thin/QueryPager.java | 5 +
.../test/java/org/apache/ignite/client/Config.java | 8 +-
.../apache/ignite/client/LocalIgniteCluster.java | 16 +--
.../org/apache/ignite/client/ReliabilityTest.java | 141 ++++++++++++---------
7 files changed, 143 insertions(+), 110 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientReconnectedException.java
similarity index 59%
copy from
modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
copy to
modules/core/src/main/java/org/apache/ignite/client/ClientReconnectedException.java
index e985689..4153f7e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/client/ClientReconnectedException.java
@@ -15,25 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin;
-
-import java.util.Collection;
-import org.apache.ignite.client.ClientException;
+package org.apache.ignite.client;
/**
- * Extracts paged data
+ * Indicates that previous connection was lost and a new connection
established,
+ * which can lead to inconsistency of non-atomic operations.
*/
-interface QueryPager<T> extends AutoCloseable {
+public class ClientReconnectedException extends ClientException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
/**
- * Reads next page. Call {@link this#hasNext()} to check if there is data
to read before calling this method.
+ * Default constructor.
*/
- public Collection<T> next() throws ClientException;
+ public ClientReconnectedException() {
+ }
/**
- * @return {@code true} if there are more pages to read; {@code false}
otherwise.
+ * Constructs a new exception with the specified message.
*/
- public boolean hasNext();
-
- /** Indicates if initial query response was received. */
- public boolean hasFirstPage();
+ public ClientReconnectedException(String msg) {
+ super(msg);
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
index cdf94f0..82fbac8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryCursor.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientReconnectedException;
import org.jetbrains.annotations.NotNull;
/**
@@ -39,12 +40,20 @@ class ClientQueryCursor<T> implements QueryCursor<T> {
/** {@inheritDoc} */
@Override public List<T> getAll() {
- List<T> res = new ArrayList<>();
+ while (true) {
+ try {
+ List<T> res = new ArrayList<>();
- for (T ent : this)
- res.add(ent);
+ for (T ent : this)
+ res.add(ent);
- return res;
+ return res;
+ }
+ catch (ClientReconnectedException ex) {
+ // If we were reconnected to a new server we can retry entire
query to failover.
+ pager.reset();
+ }
+ }
}
/** {@inheritDoc} */
@@ -85,10 +94,13 @@ class ClientQueryCursor<T> implements QueryCursor<T> {
return currPageIt;
}
- catch (ClientException e) {
- throw e;
- }
catch (Exception e) {
+ if (e instanceof ClientException)
+ throw (ClientException)e;
+
+ if (e instanceof ClientError)
+ throw (ClientError)e;
+
throw new ClientException("Failed to retrieve query
results", e);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
index ce15cae..30c73cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.client.thin;
import java.util.Collection;
import java.util.function.Consumer;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientReconnectedException;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
-import org.apache.ignite.client.ClientException;
-import org.apache.ignite.client.ClientConnectionException;
/**
* Generic query pager. Override {@link this#readResult(BinaryInputStream)} to
make it specific.
@@ -88,6 +88,15 @@ abstract class GenericQueryPager<T> implements QueryPager<T>
{
return hasFirstPage;
}
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ hasFirstPage = false;
+
+ hasNext = true;
+
+ cursorId = null;
+ }
+
/**
* Override this method to read entries from the input stream. "Entries"
means response data excluding heading
* cursor ID and trailing "has next page" flag.
@@ -119,21 +128,18 @@ abstract class GenericQueryPager<T> implements
QueryPager<T> {
return res;
}
- /** Get page with failover. */
+ /** Get page. */
private Collection<T> queryPage() throws ClientException {
try {
return ch.service(pageQryOp, req -> req.writeLong(cursorId),
this::readResult);
}
catch (ClientServerError ex) {
- if (ex.getCode() != ClientStatus.RESOURCE_DOES_NOT_EXIST)
- throw ex;
- }
- catch (ClientConnectionException ignored) {
- }
-
- // Retry entire query to failover
- hasFirstPage = false;
+ if (ex.getCode() == ClientStatus.RESOURCE_DOES_NOT_EXIST) {
+ throw new ClientReconnectedException("Client was reconnected
in the middle of results fetch, " +
+ "query results can be inconsistent, please retry the
query.");
+ }
- return ch.service(qryOp, qryWriter, this::readResult);
+ throw ex;
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
index e985689..5b90b5c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/QueryPager.java
@@ -36,4 +36,9 @@ interface QueryPager<T> extends AutoCloseable {
/** Indicates if initial query response was received. */
public boolean hasFirstPage();
+
+ /**
+ * Reset query pager.
+ */
+ public void reset();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/Config.java
b/modules/core/src/test/java/org/apache/ignite/client/Config.java
index 3e8e0e1..c279e8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/Config.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/Config.java
@@ -17,8 +17,6 @@
package org.apache.ignite.client;
-import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -34,12 +32,10 @@ public class Config {
/** Name of the cache created by default in the cluster. */
public static final String DEFAULT_CACHE_NAME = "default";
+ private static final TcpDiscoveryIpFinder ipFinder = new
TcpDiscoveryVmIpFinder().setShared(true);
+
/** */
public static IgniteConfiguration getServerConfiguration() {
- TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
-
- ipFinder.registerAddresses(Collections.singletonList(new
InetSocketAddress("127.0.0.1", 47500)));
-
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
discoverySpi.setIpFinder(ipFinder);
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
b/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
index 18ff338..c6e1593 100644
---
a/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
+++
b/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java
@@ -17,18 +17,16 @@
package org.apache.ignite.client;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -77,16 +75,18 @@ public class LocalIgniteCluster implements AutoCloseable {
}
/** {@inheritDoc} */
- @Override public void close() {
+ @Override public synchronized void close() {
srvs.forEach(Ignite::close);
srvs.clear();
+
+ failedCfgs.clear();
}
/**
* Remove one random node.
*/
- public void failNode() {
+ public synchronized void failNode() {
if (srvs.isEmpty())
throw new IllegalStateException("Cannot remove node from empty
cluster");
@@ -109,7 +109,7 @@ public class LocalIgniteCluster implements AutoCloseable {
/**
* Restore one of the failed nodes.
*/
- public void restoreNode() {
+ public synchronized void restoreNode() {
if (failedCfgs.isEmpty())
throw new IllegalStateException("Cannot restore nodes in healthy
cluster");
@@ -154,10 +154,6 @@ public class LocalIgniteCluster implements AutoCloseable {
private static IgniteConfiguration getConfiguration(NodeConfiguration
nodeCfg) {
IgniteConfiguration igniteCfg = Config.getServerConfiguration();
-
((TcpDiscoverySpi)igniteCfg.getDiscoverySpi()).getIpFinder().registerAddresses(
- Collections.singletonList(new InetSocketAddress(HOST,
nodeCfg.getDiscoveryPort()))
- );
-
igniteCfg.setClientConnectorConfiguration(new
ClientConnectorConfiguration()
.setHost(HOST)
.setPort(nodeCfg.getClientPort())
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 9f933c8..646c52b 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -18,47 +18,36 @@
package org.apache.ignite.client;
import java.lang.management.ManagementFactory;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.Cache;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
+import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.ClientConfiguration;
-import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
-import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Rule;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
/**
* High Availability tests.
*/
-public class ReliabilityTest {
- /** Per test timeout */
- @Rule
- public Timeout globalTimeout = new Timeout((int)
GridTestUtils.DFLT_TEST_TIMEOUT);
-
+public class ReliabilityTest extends GridCommonAbstractTest {
/**
* Thin clint failover.
*/
@@ -89,6 +78,8 @@ public class ReliabilityTest {
assertEquals(val, cachedVal);
});
+ cache.clear();
+
// Composite operation failover: query
Map<Integer, String> data = IntStream.rangeClosed(1, 1000).boxed()
.collect(Collectors.toMap(i -> i, i -> String.format("String
%s", i)));
@@ -142,19 +133,13 @@ public class ReliabilityTest {
IgniteClient client = Ignition.startClient(new
ClientConfiguration()
.setAddresses(cluster.clientAddresses().iterator().next()))
) {
- ObjectName mbeanName =
U.makeMBeanName(Ignition.allGrids().get(0).name(), "Clients",
- ClientListenerProcessor.class.getSimpleName());
-
- ClientProcessorMXBean mxBean =
MBeanServerInvocationHandler.newProxyInstance(
- ManagementFactory.getPlatformMBeanServer(), mbeanName,
ClientProcessorMXBean.class,true);
-
ClientCache<Integer, Integer> cache = client.createCache("cache");
// Before fail.
cache.put(0, 0);
// Fail.
- mxBean.dropAllConnections();
+ dropAllThinClientConnections(Ignition.allGrids().get(0));
try {
cache.put(0, 0);
@@ -168,62 +153,94 @@ public class ReliabilityTest {
}
}
- /** */
- @FunctionalInterface
- private interface Assertion {
- /** */
- void call() throws Exception;
+ /**
+ * Test that failover doesn't lead to silent query inconsistency.
+ */
+ @Test
+ public void testQueryConsistencyOnFailover() throws Exception {
+ int CLUSTER_SIZE = 2;
+
+ try (LocalIgniteCluster cluster =
LocalIgniteCluster.start(CLUSTER_SIZE);
+ IgniteClient client = Ignition.startClient(new
ClientConfiguration()
+ .setAddresses(cluster.clientAddresses().toArray(new
String[CLUSTER_SIZE])))
+ ) {
+ ClientCache<Integer, Integer> cache = client.createCache("cache");
+
+ cache.put(0, 0);
+ cache.put(1, 1);
+
+ Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer,
String>().setPageSize(1);
+
+ try (QueryCursor<Cache.Entry<Integer, String>> cur =
cache.query(qry)) {
+ int cnt = 0;
+
+ for (Iterator<Cache.Entry<Integer, String>> it =
cur.iterator(); it.hasNext(); it.next()) {
+ cnt++;
+
+ if (cnt == 1) {
+ for (int i = 0; i < CLUSTER_SIZE; i++)
+
dropAllThinClientConnections(Ignition.allGrids().get(i));
+ }
+ }
+
+ fail("ClientReconnectedException must be thrown");
+ }
+ catch (ClientReconnectedException expected) {
+ // No-op.
+ }
+ }
}
/**
- * Run the assertion while Ignite nodes keep failing/recovering 10 times.
+ * Drop all thin client connections on given Ignite instance.
+ *
+ * @param ignite Ignite.
*/
- private static void assertOnUnstableCluster(LocalIgniteCluster cluster,
Assertion assertion) {
- // Keep changing Ignite cluster topology by adding/removing nodes
- final AtomicBoolean isTopStable = new AtomicBoolean(false);
+ private void dropAllThinClientConnections(Ignite ignite) throws Exception {
+ ObjectName mbeanName = U.makeMBeanName(ignite.name(), "Clients",
+ ClientListenerProcessor.class.getSimpleName());
- final AtomicReference<Throwable> err = new AtomicReference<>(null);
+ ClientProcessorMXBean mxBean =
MBeanServerInvocationHandler.newProxyInstance(
+ ManagementFactory.getPlatformMBeanServer(), mbeanName,
ClientProcessorMXBean.class, true);
+
+ mxBean.dropAllConnections();
+ }
+
+ /**
+ * Run the closure while Ignite nodes keep failing/recovering several
times.
+ */
+ private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable
clo) throws Exception {
+ // Keep changing Ignite cluster topology by adding/removing nodes.
+ final AtomicBoolean stopFlag = new AtomicBoolean(false);
Future<?> topChangeFut = Executors.newSingleThreadExecutor().submit(()
-> {
- for (int i = 0; i < 10 && err.get() == null; i++) {
- while (cluster.size() != 1)
- cluster.failNode();
+ try {
+ for (int i = 0; i < 5 && !stopFlag.get(); i++) {
+ while (cluster.size() != 1)
+ cluster.failNode();
- while (cluster.size() != cluster.getInitialSize())
- cluster.restoreNode();
+ while (cluster.size() != cluster.getInitialSize())
+ cluster.restoreNode();
+
+ awaitPartitionMapExchange();
+ }
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
}
- isTopStable.set(true);
+ stopFlag.set(true);
});
- // Use Ignite while the nodes keep failing
+ // Use Ignite while nodes keep failing.
try {
- while (err.get() == null && !isTopStable.get()) {
- try {
- assertion.call();
- }
- catch (ClientServerError ex) {
- // TODO: fix CACHE_DOES_NOT_EXIST server error and remove
this exception handler
- if (ex.getCode() != ClientStatus.CACHE_DOES_NOT_EXIST)
- throw ex;
- }
- }
- }
- catch (Throwable e) {
- err.set(e);
- }
+ while (!stopFlag.get())
+ clo.run();
- try {
topChangeFut.get();
}
- catch (Exception e) {
- err.set(e);
+ finally {
+ stopFlag.set(true);
}
-
- Throwable ex = err.get();
-
- String msg = ex == null ? "" : ex.getMessage();
-
- assertNull(msg, ex);
}
}