This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 564d5db IGNITE-11320: Support for individual reconnect in case of
best effort affinity mode added. (#7152)
564d5db is described below
commit 564d5dbc139c8a78d28b76cd7e2d211ab6f0f929
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Thu Dec 19 21:04:44 2019 +0300
IGNITE-11320: Support for individual reconnect in case of best effort
affinity mode added. (#7152)
---
...teJdbcThinDriverAffinityAwarenessTestSuite.java | 2 +
...cThinAffinityAwarenessReconnectionSelfTest.java | 398 +++++++++++++++++++
.../thin/JdbcThinAffinityAwarenessSelfTest.java | 104 ++---
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 5 +-
.../ignite/internal/jdbc/thin/AffinityCache.java | 2 +-
.../internal/jdbc/thin/JdbcThinConnection.java | 426 +++++++++++++++------
.../ignite/internal/jdbc/thin/JdbcThinTcpIo.java | 23 +-
7 files changed, 766 insertions(+), 194 deletions(-)
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
index 888d65e..3937fe2 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.jdbc.suite;
import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
+import
org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessReconnectionSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessSelfTest;
import
org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessTransactionsSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
@@ -38,6 +39,7 @@ import org.junit.runners.Suite;
JdbcThinStatementSelfTest.class,
JdbcThinAffinityAwarenessSelfTest.class,
JdbcThinAffinityAwarenessTransactionsSelfTest.class,
+ JdbcThinAffinityAwarenessReconnectionSelfTest.class,
})
public class IgniteJdbcThinDriverAffinityAwarenessTestSuite {
/**
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
new file mode 100644
index 0000000..3a3cde9
--- /dev/null
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Jdbc thin affinity awareness reconnection test.
+ */
+public class JdbcThinAffinityAwarenessReconnectionSelfTest extends
JdbcThinAbstractSelfTest {
+ /** URL. */
+ private static final String URL =
"jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true";
+
+ /** Nodes count. */
+ private static final int INITIAL_NODES_CNT = 3;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(INITIAL_NODES_CNT);
+ }
+
+ /**
+ * Check that background connection establishment works as expected.
+ * <p>
+ * Within new reconnection logic in affinity awareness mode when {@code
JdbcThinConnection} is created
+ * it eagerly establishes a connection to one and only one ignite node.
All other connections to nodes specified in
+ * connection properties are established by background thread.
+ * <p>
+ * So in given test we specify url with 3 different ports and verify that
3 connections will be created:
+ * one in eager mode and two within background thread. It takes some time
for background thread to create
+ * a connection, and cause, in addition to that it runs periodically with
some delay,
+ * {@code GridTestUtils.waitForCondition} is used in order to check that
all expected connections are established.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBackgroundConnectionEstablishment() throws Exception {
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn,
"ios");
+
+ assertConnectionsCount(ios, 3);
+ }
+ }
+
+ /**
+ * Test connection failover:
+ * <ol>
+ * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT}
connections are established.</li>
+ * <li>Stop one node, invalidate dead connection (jdbc thin, won't detect
that node has gone,
+ * until it tries to touch it) and verify, that connections count has
decremented. </li>
+ * <li>Start, previously stopped node, and check that connections count
also restored to initial value.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConnectionFailover() throws Exception {
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn,
"ios");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+ assertEquals("Unexpected connections count.", INITIAL_NODES_CNT,
ios.size());
+
+ stopGrid(1);
+
+ invalidateConnectionToStoppedNode(conn);
+
+ assertEquals("Unexpected connections count.", INITIAL_NODES_CNT -
1, ios.size());
+
+ startGrid(1);
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+ }
+ }
+
+ /**
+ * Test total connection failover:
+ * <ol>
+ * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT}
connections are established.</li>
+ * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't
detect that node has gone,
+ * until it tries to touch it) and verify, that connections count equals
to zero. </li>
+ * <li>Start, previously stopped nodes, and check that connections count
also restored to initial value.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTotalConnectionFailover() throws Exception {
+ try(Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn,
"ios");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++) {
+ stopGrid(i);
+ invalidateConnectionToStoppedNode(conn);
+ }
+
+ assertConnectionsCount(ios, 0);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++)
+ startGrid(i);
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+ }
+ }
+
+ /**
+ * Test eager connection failover:
+ * <ol>
+ * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT}
connections are established.</li>
+ * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't
detect that node has gone,
+ * until it tries to touch it) and verify, that connections count equals
to zero. </li>
+ * <li>Wait for some time, in order for reconnection thread to increase
delay between connection attempts,
+ * because of reconnection failures.</li>
+ * <li>Start, previously stopped nodes, and send simple query immediately.
Eager reconnection is expected.
+ * <b>NOTE</b>:There's still a chance that connection would be recreated
by background thread and not eager process.
+ * In order to decrease given possibility we've waited for some time on
previous step.</li>
+ * <li>Ensure that after some time all connections will be restored.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testEagerConnectionFailover() throws Exception {
+ try(Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn,
"ios");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++) {
+ stopGrid(i);
+ invalidateConnectionToStoppedNode(conn);
+ }
+
+ assertEquals("Unexpected connections count.", 0, ios.size());
+
+ doSleep(4 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++)
+ startGrid(i);
+
+ conn.createStatement().execute("select 1;");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+ }
+ }
+
+ /**
+ * Check that reconnection thread increases delay between unsuccessful
connection attempts:
+ * <ol>
+ * <li>Specify two inet addresses one valid and one inoperative.</li>
+ * <li>Wait for specific amount of time. The reconnection logic suppose to
increase delays between reconnection
+ * attempts. The basic idea is very simple: delay is doubled on evey
connection failure until connection succeeds
+ * or until delay exceeds predefined maximum value {@code
JdbcThinConnection.RECONNECTION_MAX_DELAY}
+ * <pre>
+ * |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
+ * where: '|' is connection attempt;
+ * '_' is an amount of time that reconnection tread waits, equal
to JdbcThinConnection.RECONNECTION_DELAY;
+ *
+ * so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four
connection attempts:
+ * |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
+ * </pre>
+ * </li>
+ * <li>Check that there were exact four reconnection attempts. In order to
do this, we check logs, expecting to see
+ * four warning messages there.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReconnectionDelayIncreasing() throws Exception {
+ Logger log = Logger.getLogger(JdbcThinConnection.class.getName());
+ LogHandler hnd = new LogHandler();
+ hnd.setLevel(Level.ALL);
+ log.setUseParentHandlers(false);
+ log.addHandler(hnd);
+ log.setLevel(Level.ALL);
+
+ try (Connection ignored = DriverManager.getConnection(
+
"jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810?affinityAwareness=true")) {
+ hnd.records.clear();
+
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ assertEquals("Unexpected log records count.", 4,
hnd.records.size());
+
+ String expRecordMsg = "Failed to connect to Ignite node " +
+ "[url=jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810].
address = [localhost/127.0.0.1:10810].";
+
+ for (LogRecord record: hnd.records) {
+ assertEquals("Unexpected log record text.", expRecordMsg,
record.getMessage());
+ assertEquals("Unexpected log level", Level.WARNING,
record.getLevel());
+ }
+ }
+ }
+
+ /**
+ * Check that reconnection thread selectively increases delay between
unsuccessful connection attempts:
+ * <ol>
+ * <li>Create {@code JdbcThinConnection} with two valid inet
addresses.</li>
+ * <li>Stop one node and invalidate corresponding connection. Ensure that
only one connection left.</li>
+ * <li>Wait for specific amount of time. The reconnection logic suppose to
increase delays between reconnection
+ * attempts. The basic idea is very simple: delay is doubled on evey
connection failure until connection succeeds
+ * or until delay exceeds predefined maximum value {@code
JdbcThinConnection.RECONNECTION_MAX_DELAY}
+ * <pre>
+ * |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
+ * where: '|' is connection attempt;
+ * '_' is an amount of time that reconnection tread waits, equal
to JdbcThinConnection.RECONNECTION_DELAY;
+ *
+ * so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four
connection attempts:
+ * |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
+ * </pre>
+ * </li>
+ * <li>Check that there were exact four reconnection attempts. In order to
do this, we check logs, expecting to see
+ * four warning messages there.</li>
+ * <li>Start previously stopped node.</li>
+ * <li>Wait until next reconnection attempt.</li>
+ * <li>Check that both connections are established and that there are no
warning messages within logs.</li>
+ * <li>One more time: stop one node and invalidate corresponding
connection.
+ * Ensure that only one connection left.</li>
+ * <li>Wait for some time.</li>
+ * <li>Ensure that delay between reconnection was reset to initial value.
+ * In other words, we again expect four warning messages within
logs.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReconnectionDelaySelectiveIncreasing() throws Exception {
+ Logger log = Logger.getLogger(JdbcThinConnection.class.getName());
+ LogHandler hnd = new LogHandler();
+ hnd.setLevel(Level.ALL);
+ log.setUseParentHandlers(false);
+ log.addHandler(hnd);
+ log.setLevel(Level.ALL);
+
+ try (Connection conn = DriverManager.getConnection(
+
"jdbc:ignite:thin://127.0.0.1:10800..10801?affinityAwareness=true")) {
+ // Stop one node and invalidate corresponding connection. Ensure
that only one connection left.
+ stopGrid(0);
+
+ invalidateConnectionToStoppedNode(conn);
+
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn,
"ios");
+
+ assertEquals("Unexpected connections count.", 1, ios.size());
+
+ hnd.records.clear();
+
+ // Wait for some specific amount of time and ensure that there
were exact four reconnection attempts.
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ assertEquals("Unexpected log records count.", 4,
hnd.records.size());
+
+ String expRecordMsg = "Failed to connect to Ignite node
[url=jdbc:ignite:thin://127.0.0.1:10800..10801]." +
+ " address = [localhost/127.0.0.1:10800].";
+
+ for (LogRecord record: hnd.records) {
+ assertEquals("Unexpected log record text.", expRecordMsg,
record.getMessage());
+ assertEquals("Unexpected log level", Level.WARNING,
record.getLevel());
+ }
+
+ // Start previously stopped node.
+ startGrid(0);
+
+ hnd.records.clear();
+
+ // Waiting until next reconnection attempt.
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ // Checking that both connections are established and that there
are no warning messages within logs.
+ assertEquals("Unexpected log records count.", 0,
hnd.records.size());
+
+ assertEquals("Unexpected connections count.", 2, ios.size());
+
+ // One more time: stop one node, invalidate corresponding
connection and ensure that only one connection
+ // left.
+ stopGrid(0);
+
+ invalidateConnectionToStoppedNode(conn);
+
+ assertEquals("Unexpected connections count.", 1, ios.size());
+
+ hnd.records.clear();
+
+ // Wait for some time and ensure that delay between reconnection
was reset to initial value.
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ assertEquals("Unexpected log records count.", 4,
hnd.records.size());
+
+ for (LogRecord record: hnd.records) {
+ assertEquals("Unexpected log record text.", expRecordMsg,
record.getMessage());
+ assertEquals("Unexpected log level", Level.WARNING,
record.getLevel());
+ }
+
+ startGrid(0);
+ }
+ }
+
+ /**
+ * Assert connections count.
+ *
+ * @param ios Map that holds connections.
+ * @param expConnCnt Expected connections count.
+ */
+ private void assertConnectionsCount(Map<UUID, JdbcThinTcpIo> ios, int
expConnCnt)
+ throws IgniteInterruptedCheckedException {
+ boolean allConnectionsEstablished = GridTestUtils.waitForCondition(()
-> ios.size() == expConnCnt,
+ 10_000);
+
+ assertTrue("Unexpected connections count.", allConnectionsEstablished);
+ }
+
+ /**
+ * Invalidate connection to stopped node. Jdbc thin, won't detect that
node has gone, until it tries to touch it.
+ * So sending simple query to randomly chosen connection(socket), sooner
or later, will touch dead one,
+ * and thus invalidate it.
+ *
+ * @param conn Connections.
+ */
+ private void invalidateConnectionToStoppedNode(Connection conn) {
+ while (true) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("select 1");
+ }
+ catch (SQLException e) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Simple {@code java.util.logging.Handler} implementation in order to
check log records
+ * generated by {@code JdbcThinConnection}.
+ */
+ static class LogHandler extends Handler {
+
+ /** Log records. */
+ private final List<LogRecord> records = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void publish(LogRecord record) {
+ records.add(record);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void flush() {
+ }
+
+ /**
+ * @return Records.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public
List<LogRecord> records() {
+ return records;
+ }
+ }
+}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
index 22e5d0d..f0e632c 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
@@ -30,7 +30,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -133,14 +132,14 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
*/
@Test
public void testExecuteQueries() throws Exception {
- checkNodesUsage(null, "select * from Person where _key = 1", 1, 1,
+ checkNodesUsage(null, stmt, "select * from Person where _key = 1", 1,
1,
false);
- checkNodesUsage(null, "select * from Person where _key = 1 or _key =
2", 2,
+ checkNodesUsage(null, stmt, "select * from Person where _key = 1 or
_key = 2", 2,
2, false);
- checkNodesUsage(null, "select * from Person where _key in (1, 2)", 2,
2,
- false);
+ checkNodesUsage(null, stmt, "select * from Person where _key in (1,
2)", 2,
+ 2, false);
}
/**
@@ -155,7 +154,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(1, 2);
- checkNodesUsage(ps, null, 1, 1, false);
+ checkNodesUsage(ps, null, null, 1, 1, false);
// Use case 2.
ps = conn.prepareStatement("select * from Person where _key = ? or
_key = ?");
@@ -164,7 +163,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, false);
+ checkNodesUsage(ps, null, null, 2, 2, false);
// Use case 3.
ps = conn.prepareStatement("select * from Person where _key in (?,
?)");
@@ -173,7 +172,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, false);
+ checkNodesUsage(ps, null, null, 2, 2, false);
}
/**
@@ -183,13 +182,13 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
*/
@Test
public void testUpdateQueries() throws Exception {
- checkNodesUsage(null, "update Person set firstName = 'TestFirstName'
where _key = 1",
+ checkNodesUsage(null, stmt, "update Person set firstName =
'TestFirstName' where _key = 1",
1, 1, true);
- checkNodesUsage(null, "update Person set firstName = 'TestFirstName'
where _key = 1 or _key = 2",
+ checkNodesUsage(null, stmt, "update Person set firstName =
'TestFirstName' where _key = 1 or _key = 2",
2, 2, true);
- checkNodesUsage(null, "update Person set firstName = 'TestFirstName'
where _key in (1, 2)",
+ checkNodesUsage(null, stmt, "update Person set firstName =
'TestFirstName' where _key in (1, 2)",
2, 2, true);
}
@@ -206,7 +205,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(1, 2);
- checkNodesUsage(ps, null, 1, 1, true);
+ checkNodesUsage(ps, null, null, 1, 1, true);
// Use case 2.
ps = conn.prepareStatement("update Person set firstName =
'TestFirstName' where _key = ? or _key = ?");
@@ -215,7 +214,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, true);
+ checkNodesUsage(ps, null, null, 2, 2, true);
// Use case 3.
ps = conn.prepareStatement("update Person set firstName =
'TestFirstName' where _key in (?, ?)");
@@ -224,7 +223,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, true);
+ checkNodesUsage(ps, null, null, 2, 2, true);
}
/**
@@ -235,12 +234,12 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
@Test
public void testDeleteQueries() throws Exception {
// In case of simple query like "delete from Person where _key = 1"
fast update logic is used,
- // so parition result is not calculated on the server side - nothing
to check.
+ // so partition result is not calculated on the server side - nothing
to check.
- checkNodesUsage(null, "delete from Person where _key = 10000 or _key =
20000",
+ checkNodesUsage(null, stmt, "delete from Person where _key = 10000 or
_key = 20000",
2, 0, true);
- checkNodesUsage(null, "delete from Person where _key in (10000,
20000)",
+ checkNodesUsage(null, stmt, "delete from Person where _key in (10000,
20000)",
2, 0, true);
}
@@ -252,7 +251,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
@Test
public void testDeleteParametrizedQueries() throws Exception {
// In case of simple query like "delete from Person where _key = ?"
fast update logic is used,
- // so parition result is not calculated on the server side - nothing
to check.
+ // so partition result is not calculated on the server side - nothing
to check.
// Use case 1.
PreparedStatement ps = conn.prepareStatement("delete from Person where
_key = ? or _key = ?");
@@ -261,7 +260,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(2, 2000);
- checkNodesUsage(ps, null, 2, 0, true);
+ checkNodesUsage(ps, null, null, 2, 0, true);
// Use case 2.
ps = conn.prepareStatement("delete from Person where _key in (?, ?)");
@@ -270,7 +269,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
ps.setInt(2, 2000);
- checkNodesUsage(ps, null, 2, 0, true);
+ checkNodesUsage(ps, null, null, 2, 0, true);
}
/**
@@ -352,14 +351,14 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
fillCache(cacheName);
- checkNodesUsage(null,
+ checkNodesUsage(null, stmt,
"select * from \"" + cacheName + "\".Person where _key = 1",
1, 1, false);
}
/**
* Check that affinity cache is invalidated in case of changing topology,
- * detected during partions destribution retrieval.
+ * detected during partitions distribution retrieval.
*
* @throws Exception If failed.
*/
@@ -483,7 +482,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
* @throws Exception If failed.
*/
@Test
- public void testAffinityCacheStoresSchemaBindedQuries() throws Exception {
+ public void testAffinityCacheStoresSchemaBindedQueries() throws Exception {
final String cacheName = "yacc";
CacheConfiguration<Object, Object> cache =
prepareCacheConfig(cacheName);
@@ -515,12 +514,12 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
}
/**
- * Check that affinity cache stores compacted version of partitoins
destributions.
+ * Check that affinity cache stores compacted version of partitions
distributions.
*
* @throws Exception If failed.
*/
@Test
- public void testAffinityCacheCompactsPartitonDestributions() throws
Exception {
+ public void testAffinityCacheCompactsPartitionDistributions() throws
Exception {
final String cacheName = "yaccc";
CacheConfiguration<Object, Object> cache =
prepareCacheConfig(cacheName);
@@ -546,56 +545,16 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
assertEquals("Sql sub-cache of affinity cache has unexpected number of
elements.",
2, sqlCache.size());
- assertEquals("Partitions destribution sub-cache of affinity cache has
unexpected number of elements.",
+ assertEquals("Partitions distribution sub-cache of affinity cache has
unexpected number of elements.",
2, cachePartitionsDistribution.size());
- // Main assertition of the test: we are checking that partitions
destributions for different caches
+ // Main assertion of the test: we are checking that partitions
distributions for different caches
// are equal in therms of (==)
assertTrue("Partitions distributions are not the same.",
cachePartitionsDistribution.get(0) ==
cachePartitionsDistribution.get(1));
}
/**
- * Check that affinity awareness works fine after reconnection.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testReconnect() throws Exception {
- checkNodesUsage(null, "select * from Person where _key = 3", 1, 1,
- false);
-
- startGrid(7);
-
- for(int i = 0; i < NODES_CNT; i++)
- stopGrid(i);
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.execute("select * from Person where _key = 3");
-
- return null;
- }
- }, SQLException.class, "Failed to communicate with Ignite cluster.");
-
- for(int i = 0; i < NODES_CNT; i++)
- startGrid(i);
-
- stopGrid(4);
- stopGrid(5);
- stopGrid(6);
- stopGrid(7);
-
- stmt = conn.createStatement();
-
- // We need this extra query to invalidate obsolete affinity cache
- stmt.execute("select * from Person where _key = 3");
-
- checkNodesUsage(null, "select * from Person where _key = 3", 1, 1,
- false);
- }
-
- /**
* Prepares default cache configuration with given name.
*
* @param cacheName Cache name.
@@ -607,6 +566,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
cache.setName(cacheName);
cache.setCacheMode(PARTITIONED);
+ cache.setBackups(1);
cache.setIndexedTypes(
Integer.class, Person.class
);
@@ -615,8 +575,8 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
}
/**
- * Utitlity method that executes given query and verifies that expeted
number of records was returned.
- * Besides that given method verified that partitoin result for
corresponding query is null.
+ * Utility method that executes given query and verifies that expected
number of records was returned.
+ * Besides that given method verified that partition result for
corresponding query is null.
*
* @param sqlQry Sql query.
* @param expRowsCnt Expected rows count.
@@ -656,8 +616,8 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
* @param dml Flag that signals whether we execute dml or not.
* @throws Exception If failed.
*/
- private void checkNodesUsage(PreparedStatement ps, String sql, int
maxNodesUsedCnt, int expRowsCnt, boolean dml)
- throws Exception {
+ private void checkNodesUsage(PreparedStatement ps, Statement stmt, String
sql, int maxNodesUsedCnt, int expRowsCnt,
+ boolean dml) throws Exception {
// Warm up an affinity cache.
if (ps != null)
if (dml)
@@ -729,7 +689,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends
JdbcThinAbstractSelfTest
"], got [" + nonEmptyMetricsCntr + "]",
nonEmptyMetricsCntr > 0 && nonEmptyMetricsCntr <= maxNodesUsedCnt);
- assertEquals("Executions count doesn't match expeted value: expected
[" +
+ assertEquals("Executions count doesn't match expected value: expected
[" +
NODES_CNT * QUERY_EXECUTION_MULTIPLIER + "], got [" +
qryExecutionsCntr + "]",
NODES_CNT * QUERY_EXECUTION_MULTIPLIER, qryExecutionsCntr);
}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 243f5c4..185fa91 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -286,8 +286,9 @@ public class JdbcThinConnectionSelfTest extends
JdbcThinAbstractSelfTest {
*/
@Test
public void testSqlHints() throws Exception {
- try (Connection conn =
DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
- assertHints(conn, false, false, false, false, false, false,
affinityAwareness);
+ try (Connection conn =
DriverManager.getConnection(urlWithAffinityAwarenessFlag)) {
+ assertHints(conn, false, false, false, false, false,
+ false, affinityAwareness);
}
try (Connection conn =
DriverManager.getConnection(urlWithAffinityAwarenessFlag +
"&distributedJoins=true")) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
index d582ede..bd4dc4b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
@@ -107,7 +107,7 @@ public final class AffinityCache {
* @param cacheId Cache Id.
* @return Cache partitoins distribution for given cache Id or null.
*/
- UUID[] cacheDistribution(int cacheId) {
+ public UUID[] cacheDistribution(int cacheId) {
return cachePartitionsDistribution.get(cacheId);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 065bac9..971acdf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -43,22 +43,27 @@ import java.sql.Struct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.jdbc2.JdbcUtils;
@@ -107,6 +112,12 @@ public class JdbcThinConnection implements Connection {
/** Request timeout period. */
private static final int REQUEST_TIMEOUT_PERIOD = 1_000;
+ /** Reconnection period. */
+ public static final int RECONNECTION_DELAY = 200;
+
+ /** Reconnection maximum period. */
+ private static final int RECONNECTION_MAX_DELAY = 300_000;
+
/** Network timeout permission */
private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout";
@@ -149,15 +160,12 @@ public class JdbcThinConnection implements Connection {
/** Connection properties. */
private final ConnectionProperties connProps;
- /** Connected. */
- private volatile boolean connected;
+ /** The amount of potentially alive {@code JdbcThinTcpIo} instances -
connections to server nodes. */
+ private final AtomicInteger connCnt = new AtomicInteger();
/** Tracked statements to close on disconnect. */
private final Set<JdbcThinStatement> stmts = Collections.newSetFromMap(new
IdentityHashMap<>());
- /** Query timeout timer */
- private final Timer timer;
-
/** Affinity cache. */
private AffinityCache affinityCache;
@@ -165,10 +173,7 @@ public class JdbcThinConnection implements Connection {
private volatile JdbcThinTcpIo singleIo;
/** Node Ids tp ignite endpoints. */
- private final Map<UUID, JdbcThinTcpIo> ios = new ConcurrentHashMap<>();
-
- /** Ignite endpoints to use for better performance in case of random
access. */
- private JdbcThinTcpIo[] iosArr;
+ private final ConcurrentSkipListMap<UUID, JdbcThinTcpIo> ios = new
ConcurrentSkipListMap<>();
/** Server index. */
private int srvIdx;
@@ -188,6 +193,15 @@ public class JdbcThinConnection implements Connection {
/** Network timeout. */
private int netTimeout;
+ /** Background periodical maintenance: query timeouts and reconnection
handler. */
+ private final ScheduledExecutorService maintenanceExecutor =
Executors.newScheduledThreadPool(2);
+
+ /** Cancelable future for query timeout task. */
+ private ScheduledFuture<?> qryTimeoutScheduledFut;
+
+ /** Cancelable future for connections handler task. */
+ private ScheduledFuture<?> connectionsHndScheduledFut;
+
/**
* Creates new connection.
*
@@ -203,32 +217,30 @@ public class JdbcThinConnection implements Connection {
schema = JdbcUtils.normalizeSchema(connProps.getSchema());
- timer = new Timer("query-timeout-timer");
-
affinityAwareness = connProps.isAffinityAwareness();
ensureConnected();
+
+ if (affinityAwareness)
+ connectionsHndScheduledFut =
maintenanceExecutor.scheduleWithFixedDelay(new ConnectionHandlerTask(),
+ 0, RECONNECTION_DELAY, TimeUnit.MILLISECONDS);
}
/**
* @throws SQLException On connection error.
*/
private void ensureConnected() throws SQLException {
- if (connected)
+ if (connCnt.get() > 0)
return;
assert !closed;
assert ios.isEmpty();
- assert iosArr == null;
-
- HostAndPortRange[] srvs = connProps.getAddresses();
-
if (affinityAwareness)
- connectInAffinityAwarenessMode(srvs);
+ connectInBestEffortAffinityMode();
else
- connectInCommonMode(srvs);
+ connectInCommonMode();
}
/**
@@ -445,6 +457,10 @@ public class JdbcThinConnection implements Connection {
if (isClosed())
return;
+ closed = true;
+
+ maintenanceExecutor.shutdown();
+
if (streamState != null) {
streamState.close();
@@ -457,23 +473,17 @@ public class JdbcThinConnection implements Connection {
SQLException err = null;
- closed = true;
-
if (affinityAwareness) {
for (JdbcThinTcpIo clioIo : ios.values())
clioIo.close();
ios.clear();
-
- iosArr = null;
}
else {
if (singleIo != null)
singleIo.close();
}
- timer.cancel();
-
if (err != null)
throw err;
}
@@ -858,7 +868,7 @@ public class JdbcThinConnection implements Connection {
throws SQLException {
ensureConnected();
- RequestTimeoutTimerTask reqTimeoutTimerTask = null;
+ RequestTimeoutTask reqTimeoutTask = null;
synchronized (mux) {
if (ownThread != null) {
@@ -870,16 +880,18 @@ public class JdbcThinConnection implements Connection {
ownThread = Thread.currentThread();
}
try {
+ JdbcThinTcpIo cliIo = null;
try {
- JdbcThinTcpIo cliIo = stickyIo == null ?
cliIo(calculateNodeIds(req)) : stickyIo;
+ cliIo = (stickyIo == null || !stickyIo.connected()) ?
cliIo(calculateNodeIds(req)) : stickyIo;
if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) {
- reqTimeoutTimerTask = new RequestTimeoutTimerTask(
+ reqTimeoutTask = new RequestTimeoutTask(
req instanceof JdbcBulkLoadBatchRequest ?
stmt.currentRequestId() : req.requestId(),
cliIo,
stmt.requestTimeout());
- timer.schedule(reqTimeoutTimerTask, 0,
REQUEST_TIMEOUT_PERIOD);
+ qryTimeoutScheduledFut =
maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0,
+ REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS);
}
JdbcQueryExecuteRequest qryReq = null;
@@ -892,13 +904,15 @@ public class JdbcThinConnection implements Connection {
txIo = res.activeTransaction() ? cliIo : null;
if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED &&
stmt != null &&
- stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask
!= null && reqTimeoutTimerTask.expired.get()) {
+ stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask !=
null &&
+ reqTimeoutTask.expired.get()) {
throw new
SQLTimeoutException(QueryCancelledException.ERR_MSG,
SqlStateCode.QUERY_CANCELLED,
IgniteQueryErrorCode.QUERY_CANCELED);
}
else if (res.status() != ClientListenerResponse.STATUS_SUCCESS)
- throw new SQLException(res.error(),
IgniteQueryErrorCode.codeToSqlState(res.status()), res.status());
+ throw new SQLException(res.error(),
IgniteQueryErrorCode.codeToSqlState(res.status()),
+ res.status());
updateAffinityCache(qryReq, res);
@@ -908,16 +922,17 @@ public class JdbcThinConnection implements Connection {
throw e;
}
catch (Exception e) {
- onDisconnect();
+ onDisconnect(cliIo);
if (e instanceof SocketTimeoutException)
throw new SQLException("Connection timed out.",
SqlStateCode.CONNECTION_FAILURE, e);
else
- throw new SQLException("Failed to communicate with Ignite
cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+ throw new SQLException("Failed to communicate with Ignite
cluster.",
+ SqlStateCode.CONNECTION_FAILURE, e);
}
finally {
- if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT &&
reqTimeoutTimerTask != null)
- reqTimeoutTimerTask.cancel();
+ if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT &&
reqTimeoutTask != null)
+ qryTimeoutScheduledFut.cancel(false);
}
}
finally {
@@ -932,7 +947,7 @@ public class JdbcThinConnection implements Connection {
*
* @param req Jdbc request for which we'll try to calculate node id.
* @return node UUID or null if failed to calculate.
- * @throws IOException If Exception occured during the network partiton
destribution retrieval.
+ * @throws IOException If Exception occurred during the network partition
distribution retrieval.
* @throws SQLException If Failed to calculate derived partitions.
*/
@Nullable private List<UUID> calculateNodeIds(JdbcRequest req) throws
IOException, SQLException {
@@ -982,12 +997,12 @@ public class JdbcThinConnection implements Connection {
}
/**
- * Retrieve cache destribution for specified cache Id.
+ * Retrieve cache distribution for specified cache Id.
*
* @param cacheId Cache Id.
- * @param partCnt Partitons count.
+ * @param partCnt Partitions count.
* @return Partitions cache distribution.
- * @throws IOException If Exception occured during the network partiton
destribution retrieval.
+ * @throws IOException If Exception occurred during the network partition
distribution retrieval.
*/
private UUID[] retrieveCacheDistribution(int cacheId, int partCnt) throws
IOException {
UUID[] cacheDistr = affinityCache.cacheDistribution(cacheId);
@@ -997,7 +1012,8 @@ public class JdbcThinConnection implements Connection {
JdbcResponse res;
- res = cliIo(null).sendRequest(new
JdbcCachePartitionsRequest(Collections.singleton(cacheId)), null);
+ res = cliIo(null).sendRequest(new
JdbcCachePartitionsRequest(Collections.singleton(cacheId)),
+ null);
assert res.status() == ClientListenerResponse.STATUS_SUCCESS;
@@ -1007,7 +1023,7 @@ public class JdbcThinConnection implements Connection {
affinityCache = new AffinityCache(resAffinityVer);
else if (affinityCache.version().compareTo(resAffinityVer) > 0) {
// Jdbc thin affinity cache is binded to the newer affinity
topology version, so we should ignore retrieved
- // partition destribution. Given situation might occur in case of
concurrent race and is not
+ // partition distribution. Given situation might occur in case of
concurrent race and is not
// possible in single-threaded jdbc thin client, so it's a reserve
for the future.
return null;
}
@@ -1015,7 +1031,7 @@ public class JdbcThinConnection implements Connection {
List<JdbcThinAffinityAwarenessMappingGroup> mappings =
((JdbcCachePartitionsResult)res.response()).getMappings();
- // Despite the fact that, at this moment, we request partition
destribution only for one cache,
+ // Despite the fact that, at this moment, we request partition
distribution only for one cache,
// we might retrieve multiple caches but exactly with same
distribution.
assert mappings.size() == 1;
@@ -1046,7 +1062,8 @@ public class JdbcThinConnection implements Connection {
return
derivedParts.tree().apply(partResDesc.partitionClientContext(), args);
}
catch (IgniteCheckedException e) {
- throw new SQLException("Failed to calculate derived partitions
for query.", SqlStateCode.INTERNAL_ERROR);
+ throw new SQLException("Failed to calculate derived partitions
for query.",
+ SqlStateCode.INTERNAL_ERROR);
}
}
@@ -1061,7 +1078,7 @@ public class JdbcThinConnection implements Connection {
* @throws SQLException On any error.
*/
void sendQueryCancelRequest(JdbcQueryCancelRequest req, JdbcThinTcpIo
cliIo) throws SQLException {
- if (!connected)
+ if (connCnt.get() == 0)
throw new SQLException("Failed to communicate with Ignite
cluster.", SqlStateCode.CONNECTION_FAILURE);
assert cliIo != null;
@@ -1082,7 +1099,8 @@ public class JdbcThinConnection implements Connection {
* @param stickyIO Sticky ignite endpoint.
* @throws SQLException On any error.
*/
- private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest
req, JdbcThinTcpIo stickyIO) throws SQLException {
+ private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest
req, JdbcThinTcpIo stickyIO)
+ throws SQLException {
ensureConnected();
synchronized (mux) {
@@ -1102,12 +1120,13 @@ public class JdbcThinConnection implements Connection {
throw e;
}
catch (Exception e) {
- onDisconnect();
+ onDisconnect(stickyIO);
if (e instanceof SocketTimeoutException)
throw new SQLException("Connection timed out.",
SqlStateCode.CONNECTION_FAILURE, e);
else
- throw new SQLException("Failed to communicate with Ignite
cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+ throw new SQLException("Failed to communicate with Ignite
cluster.",
+ SqlStateCode.CONNECTION_FAILURE, e);
}
finally {
synchronized (mux) {
@@ -1126,24 +1145,20 @@ public class JdbcThinConnection implements Connection {
/**
* Called on IO disconnect: close the client IO and opened statements.
*/
- private void onDisconnect() {
- if (!connected)
- return;
+ private void onDisconnect(JdbcThinTcpIo cliIo) {
+ assert connCnt.get() > 0;
if (affinityAwareness) {
- for (JdbcThinTcpIo clioIo : ios.values())
- clioIo.close();
-
- ios.clear();
+ cliIo.close();
- iosArr = null;
+ ios.remove(cliIo.nodeId());
}
else {
if (singleIo != null)
singleIo.close();
}
- connected = false;
+ connCnt.decrementAndGet();
if (streamState != null) {
streamState.close0();
@@ -1157,8 +1172,6 @@ public class JdbcThinConnection implements Connection {
stmts.clear();
}
-
- timer.cancel();
}
/**
@@ -1308,7 +1321,7 @@ public class JdbcThinConnection implements Connection {
if (err0 instanceof SQLException)
throw (SQLException)err0;
else {
- onDisconnect();
+ onDisconnect(streamingStickyIo);
if (err0 instanceof SocketTimeoutException)
throw new SQLException("Connection timed out.",
SqlStateCode.CONNECTION_FAILURE, err0);
@@ -1330,7 +1343,7 @@ public class JdbcThinConnection implements Connection {
/**
*/
void close0() {
- if (connected) {
+ if (connCnt.get() > 0) {
try {
executeBatch(true);
}
@@ -1395,7 +1408,6 @@ public class JdbcThinConnection implements Connection {
* @param nodeIds Set of node's UUIDs.
* @return Ignite endpoint to use for request/response transferring.
*/
- @SuppressWarnings("ZeroLengthArrayAllocation")
private JdbcThinTcpIo cliIo(List<UUID> nodeIds) {
if (!affinityAwareness)
return singleIo;
@@ -1404,12 +1416,12 @@ public class JdbcThinConnection implements Connection {
return txIo;
if (nodeIds == null || nodeIds.isEmpty())
- return iosArr[RND.nextInt(iosArr.length)];
+ return randomIo();
JdbcThinTcpIo io = null;
if (nodeIds.size() == 1)
- io = ios.get(nodeIds.iterator().next());
+ io = ios.get(nodeIds.get(0));
else {
int initNodeId = RND.nextInt(nodeIds.size());
@@ -1427,7 +1439,42 @@ public class JdbcThinConnection implements Connection {
}
}
- return io != null ? io : iosArr[RND.nextInt(iosArr.length)];
+ return io != null ? io : randomIo();
+ }
+
+ /**
+ * Returns random tcpIo, based on random UUID, generated in a custom way
with the help of {@code Random}
+ * instead of {@code SecureRandom}. It's valid, cause cryptographically
strong pseudo
+ * random number generator is not required in this particular case. {@code
Random} is much faster
+ * than {@code SecureRandom}.
+ *
+ * @return random tcpIo
+ */
+ private JdbcThinTcpIo randomIo() {
+ byte[] randomBytes = new byte[16];
+
+ RND.nextBytes(randomBytes);
+
+ randomBytes[6] &= 0x0f; /* clear version */
+ randomBytes[6] |= 0x40; /* set to version 4 */
+ randomBytes[8] &= 0x3f; /* clear variant */
+ randomBytes[8] |= 0x80; /* set to IETF variant */
+
+ long msb = 0;
+
+ long lsb = 0;
+
+ for (int i=0; i<8; i++)
+ msb = (msb << 8) | (randomBytes[i] & 0xff);
+
+ for (int i=8; i<16; i++)
+ lsb = (lsb << 8) | (randomBytes[i] & 0xff);
+
+ UUID randomUUID = new UUID(msb, lsb);
+
+ Map.Entry<UUID, JdbcThinTcpIo> entry = ios.ceilingEntry(randomUUID);
+
+ return entry != null ? entry.getValue() :
ios.floorEntry(randomUUID).getValue();
}
/**
@@ -1457,10 +1504,11 @@ public class JdbcThinConnection implements Connection {
* Establishes a connection to ignite endpoint, trying all specified hosts
and ports one by one.
* Stops as soon as any connection is established.
*
- * @param srvs Ignite endpoints addresses.
* @throws SQLException If failed to connect to ignite cluster.
*/
- private void connectInCommonMode(HostAndPortRange[] srvs) throws
SQLException {
+ private void connectInCommonMode() throws SQLException {
+ HostAndPortRange[] srvs = connProps.getAddresses();
+
List<Exception> exceptions = null;
for (int i = 0; i < srvs.length; i++) {
@@ -1481,7 +1529,7 @@ public class JdbcThinConnection implements Connection {
singleIo = cliIo;
- connected = true;
+ connCnt.incrementAndGet();
return;
}
@@ -1513,7 +1561,7 @@ public class JdbcThinConnection implements Connection {
* @throws SQLException Umbrella exception.
*/
private void handleConnectExceptions(List<Exception> exceptions) throws
SQLException {
- if (!connected && exceptions != null) {
+ if (connCnt.get() == 0 && exceptions != null) {
close();
if (exceptions.size() == 1) {
@@ -1540,18 +1588,16 @@ public class JdbcThinConnection implements Connection {
* Establishes a connection to ignite endpoint, trying all specified hosts
and ports one by one.
* Stops as soon as all iosArr are established.
*
- * @param srvs Ignite endpoints addresses.
* @throws SQLException If failed to connect to at least one ignite
endpoint,
* or if endpoints versions are not the same.
*/
- @SuppressWarnings("ZeroLengthArrayAllocation")
- private void connectInAffinityAwarenessMode(HostAndPortRange[] srvs)
throws SQLException {
+ private void connectInBestEffortAffinityMode() throws SQLException {
List<Exception> exceptions = null;
- IgniteProductVersion prevIgniteEnpointVer = null;
+ IgniteProductVersion prevIgniteEndpointVer = null;
- for (int i = 0; i < srvs.length; i++) {
- HostAndPortRange srv = srvs[i];
+ for (int i = 0; i < connProps.getAddresses().length; i++) {
+ HostAndPortRange srv = connProps.getAddresses()[i];
try {
InetAddress[] addrs = InetAddress.getAllByName(srv.host());
@@ -1563,14 +1609,18 @@ public class JdbcThinConnection implements Connection {
new JdbcThinTcpIo(connProps, new
InetSocketAddress(addr, port), 0);
if (!cliIo.isAffinityAwarenessSupported()) {
+ cliIo.close();
+
throw new SQLException("Failed to connect to
Ignite node [url=" +
connProps.getUrl() + "]. address = [" +
addr + ':' + port + "]." +
- "Node doesn't support best affort affinity
mode.",
+ "Node doesn't support affinity awareness
mode.",
SqlStateCode.INTERNAL_ERROR);
}
- if (prevIgniteEnpointVer != null &&
!prevIgniteEnpointVer.equals(cliIo.igniteVersion())) {
+ if (prevIgniteEndpointVer != null &&
!prevIgniteEndpointVer.equals(cliIo.igniteVersion())) {
// TODO: 13.02.19 IGNITE-11321 JDBC Thin:
implement nodes multi version support.
+ cliIo.close();
+
throw new SQLException("Failed to connect to
Ignite node [url=" +
connProps.getUrl() + "]. address = [" +
addr + ':' + port + "]." +
"Different versions of nodes are not
supported in affinity awareness mode.",
@@ -1579,17 +1629,18 @@ public class JdbcThinConnection implements Connection {
cliIo.timeout(netTimeout);
- JdbcThinTcpIo ioToSameNode =
ios.get(cliIo.nodeId());
+ JdbcThinTcpIo ioToSameNode =
ios.putIfAbsent(cliIo.nodeId(), cliIo);
- // This can happen if the same node has several
IPs.
+ // This can happen if the same node has several
IPs or if connection manager background
+ // timer task runs concurrently.
if (ioToSameNode != null)
- ioToSameNode.close();
-
- ios.put(cliIo.nodeId(), cliIo);
+ cliIo.close();
+ else
+ connCnt.incrementAndGet();
- connected = true;
+ prevIgniteEndpointVer = cliIo.igniteVersion();
- prevIgniteEnpointVer = cliIo.igniteVersion();
+ return;
}
catch (Exception exception) {
if (exceptions == null)
@@ -1609,14 +1660,49 @@ public class JdbcThinConnection implements Connection {
}
handleConnectExceptions(exceptions);
+ }
+
+ /**
+ * Recreates affinity cache if affinity topology version was changed and
adds partition result to sql cache.
+ *
+ * @param qryReq Query request.
+ * @param res Jdbc Response.
+ */
+ private void updateAffinityCache(JdbcQueryExecuteRequest qryReq,
JdbcResponse res) {
+ if (affinityAwareness) {
+ AffinityTopologyVersion resAffVer = res.affinityVersion();
+
+ if (resAffVer != null && (affinityCache == null ||
affinityCache.version().compareTo(resAffVer) < 0))
+ affinityCache = new AffinityCache(resAffVer);
+
+ // Partition result was requested.
+ if (res.response() instanceof JdbcQueryExecuteResult &&
qryReq.partitionResponseRequest()) {
+ PartitionResult partRes =
((JdbcQueryExecuteResult)res.response()).partitionResult();
+
+ if (partRes == null ||
affinityCache.version().equals(partRes.topologyVersion())) {
+ int cacheId = (partRes != null && partRes.tree() != null) ?
+ GridCacheUtils.cacheId(partRes.cacheName()) :
+ -1;
+
+ PartitionClientContext partClientCtx = partRes != null ?
+ new PartitionClientContext(partRes.partitionsCount()) :
+ null;
+
+ QualifiedSQLQuery qry = new
QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery());
- iosArr = ios.values().toArray(new JdbcThinTcpIo[0]);
+ JdbcThinPartitionResultDescriptor partResDescr =
+ new JdbcThinPartitionResultDescriptor(partRes,
cacheId, partClientCtx);
+
+ affinityCache.addSqlQuery(qry, partResDescr);
+ }
+ }
+ }
}
/**
- * Request Timeout Timer Task
+ * Request Timeout Task
*/
- private class RequestTimeoutTimerTask extends TimerTask {
+ private class RequestTimeoutTask implements Runnable {
/** Request id. */
private final long reqId;
@@ -1633,7 +1719,7 @@ public class JdbcThinConnection implements Connection {
* @param reqId Request Id to cancel in case of timeout
* @param initReqTimeout Initial request timeout
*/
- RequestTimeoutTimerTask(long reqId, JdbcThinTcpIo stickyIO, int
initReqTimeout) {
+ RequestTimeoutTask(long reqId, JdbcThinTcpIo stickyIO, int
initReqTimeout) {
this.reqId = reqId;
this.stickyIO = stickyIO;
@@ -1651,7 +1737,9 @@ public class JdbcThinConnection implements Connection {
sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId),
stickyIO);
- cancel();
+ qryTimeoutScheduledFut.cancel(false);
+
+ return;
}
remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD;
@@ -1660,45 +1748,155 @@ public class JdbcThinConnection implements Connection {
LOG.log(Level.WARNING,
"Request timeout processing failure: unable to cancel
request [reqId=" + reqId + ']', e);
- cancel();
+ qryTimeoutScheduledFut.cancel(false);
}
}
}
/**
- * Recreates affinity cache if affinity topology version was changed and
adds partition result to sql cache.
- *
- * @param qryReq Query request.
- * @param res Jdbc Response.
+ * Connection Handler Task
*/
- private void updateAffinityCache(JdbcQueryExecuteRequest qryReq,
JdbcResponse res) {
- if (affinityAwareness) {
- AffinityTopologyVersion resAffVer = res.affinityVersion();
+ private class ConnectionHandlerTask implements Runnable {
+ /** Map with reconnection delays. */
+ private Map<InetSocketAddress, Integer> reconnectionDelays = new
HashMap<>();
- if (resAffVer != null && (affinityCache == null ||
affinityCache.version().compareTo(resAffVer) < 0))
- affinityCache = new AffinityCache(resAffVer);
+ /** Map with reconnection delays remainder. */
+ private Map<InetSocketAddress, Integer> reconnectionDelaysRemainder =
new HashMap<>();
- // Partition result was requested.
- if (res.response() instanceof JdbcQueryExecuteResult &&
qryReq.partitionResponseRequest()) {
- PartitionResult partRes =
((JdbcQueryExecuteResult)res.response()).partitionResult();
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ for (Map.Entry<InetSocketAddress, Integer> delayEntry :
reconnectionDelaysRemainder.entrySet())
+ reconnectionDelaysRemainder.put(delayEntry.getKey(),
delayEntry.getValue() - RECONNECTION_DELAY);
- if (partRes == null ||
affinityCache.version().equals(partRes.topologyVersion())) {
- int cacheId = (partRes != null && partRes.tree() != null) ?
- GridCacheUtils.cacheId(partRes.cacheName()) :
- -1;
+ Set<InetSocketAddress> aliveSockAddrs =
+
ios.values().stream().map(JdbcThinTcpIo::socketAddress).collect(Collectors.toSet());
- PartitionClientContext partClientCtx = partRes != null ?
- new PartitionClientContext(partRes.partitionsCount()) :
- null;
+ IgniteProductVersion prevIgniteEndpointVer = null;
- QualifiedSQLQuery qry = new
QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery());
+ for (int i = 0; i < connProps.getAddresses().length; i++) {
+ HostAndPortRange srv = connProps.getAddresses()[i];
- JdbcThinPartitionResultDescriptor partResDescr =
- new JdbcThinPartitionResultDescriptor(partRes,
cacheId, partClientCtx);
+ try {
+ InetAddress[] addrs =
InetAddress.getAllByName(srv.host());
- affinityCache.addSqlQuery(qry, partResDescr);
+ for (InetAddress addr : addrs) {
+ for (int port = srv.portFrom(); port <=
srv.portTo(); ++port) {
+ InetSocketAddress sockAddr = null;
+
+ try {
+ sockAddr = new InetSocketAddress(addr,
port);
+
+ if (aliveSockAddrs.contains(sockAddr)) {
+
reconnectionDelaysRemainder.remove(sockAddr);
+ reconnectionDelays.remove(sockAddr);
+
+ continue;
+ }
+
+ Integer delayRemainder =
reconnectionDelaysRemainder.get(sockAddr);
+
+ if (delayRemainder != null &&
delayRemainder != 0)
+ continue;
+
+ if (closed) {
+ maintenanceExecutor.shutdown();
+
+ return;
+ }
+
+ JdbcThinTcpIo cliIo =
+ new JdbcThinTcpIo(connProps, new
InetSocketAddress(addr, port), 0);
+
+ if (!cliIo.isAffinityAwarenessSupported())
{
+ processDelay(sockAddr);
+
+ LOG.log(Level.WARNING, "Failed to
connect to Ignite node [url=" +
+ connProps.getUrl() + "]. address =
[" + addr + ':' + port + "]." +
+ "Node doesn't support best effort
affinity mode.");
+
+ cliIo.close();
+
+ continue;
+ }
+
+ if (prevIgniteEndpointVer != null &&
+
!prevIgniteEndpointVer.equals(cliIo.igniteVersion())) {
+ processDelay(sockAddr);
+
+ LOG.log(Level.WARNING, "Failed to
connect to Ignite node [url=" +
+ connProps.getUrl() + "]. address =
[" + addr + ':' + port + "]." +
+ "Different versions of nodes are
not supported in best " +
+ "effort affinity mode.");
+
+ cliIo.close();
+
+ continue;
+ }
+
+ cliIo.timeout(netTimeout);
+
+ JdbcThinTcpIo ioToSameNode =
ios.putIfAbsent(cliIo.nodeId(), cliIo);
+
+ // This can happen if the same node has
several IPs or if ensureConnected() runs
+ // concurrently
+ if (ioToSameNode != null)
+ cliIo.close();
+ else
+ connCnt.incrementAndGet();
+
+ prevIgniteEndpointVer =
cliIo.igniteVersion();
+
+ if (closed) {
+ maintenanceExecutor.shutdown();
+
+ cliIo.close();
+
+ ios.remove(cliIo.nodeId());
+
+ return;
+ }
+ }
+ catch (Exception exception) {
+ if (sockAddr != null)
+ processDelay(sockAddr);
+
+ LOG.log(Level.WARNING, "Failed to connect
to Ignite node [url=" +
+ connProps.getUrl() + "]. address = ["
+ addr + ':' + port + "].");
+ }
+ }
+ }
+ }
+ catch (Exception exception) {
+ LOG.log(Level.WARNING, "Failed to connect to Ignite
node [url=" +
+ connProps.getUrl() + "]. server = [" + srv + "].");
+ }
}
}
+ catch (Exception e) {
+ LOG.log(Level.WARNING, "Connection handler processing failure.
Reconnection processes was stopped."
+ , e);
+
+ connectionsHndScheduledFut.cancel(false);
+ }
+ }
+
+ /**
+ * Increase reconnection delay if needed and store it to corresponding
maps.
+ *
+ * @param sockAddr Socket address.
+ */
+ private void processDelay(InetSocketAddress sockAddr) {
+ Integer delay = reconnectionDelays.get(sockAddr);
+
+ delay = delay == null ? RECONNECTION_DELAY : delay * 2;
+
+ if (delay > RECONNECTION_MAX_DELAY)
+ delay = RECONNECTION_MAX_DELAY;
+
+ reconnectionDelays.put(sockAddr, delay);
+
+ reconnectionDelaysRemainder.put(sockAddr, delay);
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 366be79..7663a80 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -122,7 +122,7 @@ public class JdbcThinTcpIo {
private final BufferedInputStream in;
/** Connected flag. */
- private boolean connected;
+ private volatile boolean connected;
/** Ignite server version. */
private final IgniteProductVersion igniteVer;
@@ -422,10 +422,9 @@ public class JdbcThinTcpIo {
JdbcResponse resp = readResponse();
- if (stmt != null && stmt.isCancelled())
- return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED,
QueryCancelledException.ERR_MSG);
- else
- return resp;
+ return stmt != null && stmt.isCancelled() ?
+ new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED,
QueryCancelledException.ERR_MSG) :
+ resp;
}
/**
@@ -650,4 +649,18 @@ public class JdbcThinTcpIo {
public UUID nodeId() {
return nodeId;
}
+
+ /**
+ * @return Socket address.
+ */
+ public InetSocketAddress socketAddress() {
+ return sockAddr;
+ }
+
+ /**
+ * @return Connected flag.
+ */
+ public boolean connected() {
+ return connected;
+ }
}