This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-11320 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 44132fbf53c71acff5e2098606e0289c5bbf6d1a Author: alapin <[email protected]> AuthorDate: Wed Jun 5 15:47:35 2019 +0300 IGNITE-11320: Support for individual reconnect in case of best effort affinity mode added. --- ...teJdbcThinDriverAffinityAwarenessTestSuite.java | 2 + ...cThinAffinityAwarenessReconnectionSelfTest.java | 397 +++++++++++++++++++ .../thin/JdbcThinAffinityAwarenessSelfTest.java | 104 ++--- .../jdbc/thin/JdbcThinConnectionSelfTest.java | 5 +- .../ignite/internal/jdbc/thin/AffinityCache.java | 2 +- .../internal/jdbc/thin/JdbcThinConnection.java | 426 +++++++++++++++------ .../ignite/internal/jdbc/thin/JdbcThinTcpIo.java | 23 +- 7 files changed, 765 insertions(+), 194 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java index 888d65e..3937fe2 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.jdbc.suite; import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessReconnectionSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessTransactionsSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest; @@ -38,6 +39,7 @@ import org.junit.runners.Suite; JdbcThinStatementSelfTest.class, JdbcThinAffinityAwarenessSelfTest.class, JdbcThinAffinityAwarenessTransactionsSelfTest.class, + JdbcThinAffinityAwarenessReconnectionSelfTest.class, }) public class IgniteJdbcThinDriverAffinityAwarenessTestSuite { /** diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java new file mode 100644 index 0000000..f612b5b --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java @@ -0,0 +1,397 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection; +import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +/** + * Jdbc thin affinity awareness reconnection test. + */ +public class JdbcThinAffinityAwarenessReconnectionSelfTest extends JdbcThinAbstractSelfTest { + /** URL. */ + private static final String URL = "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true"; + + /** Nodes count. */ + private static final int INITIAL_NODES_CNT = 3; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(INITIAL_NODES_CNT); + } + + /** + * Check that background connection establishment works as expected. + * <p> + * Within new reconnection logic in affinity awareness mode when {@code JdbcThinConnection} is created + * it eagerly establishes a connection to one and only one ignite node. All other connections to nodes specified in + * connection properties are established by background thread. + * <p> + * So in given test we specify url with 3 different ports and verify that 3 connections will be created: + * one in eager mode and two within background thread. It takes some time for background thread to create + * a connection, and cause, in addition to that it runs periodically with some delay, + * {@code GridTestUtils.waitForCondition} is used in order to check that all expected connections are established. + * + * @throws Exception If failed. + */ + @Test + public void testBackgroundConnectionEstablishment() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, 3); + } + } + + /** + * Test connection failover: + * <ol> + * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li> + * <li>Stop one node, invalidate dead connection (jdbc thin, won't detect that node has gone, + * until it tries to touch it) and verify, that connections count has decremented. </li> + * <li>Start, previously stopped node, and check that connections count also restored to initial value.</li> + * </ol> + * + * @throws Exception If failed. + */ + @Test + public void testConnectionFailover() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + + assertEquals("Unexpected connections count.", INITIAL_NODES_CNT, ios.size()); + + stopGrid(1); + + invalidateConnectionToStoppedNode(conn); + + assertEquals("Unexpected connections count.", INITIAL_NODES_CNT - 1, ios.size()); + + startGrid(1); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + } + } + + /** + * Test total connection failover: + * <ol> + * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li> + * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone, + * until it tries to touch it) and verify, that connections count equals to zero. </li> + * <li>Start, previously stopped nodes, and check that connections count also restored to initial value.</li> + * </ol> + * + * @throws Exception If failed. + */ + @Test + public void testTotalConnectionFailover() throws Exception { + try(Connection conn = DriverManager.getConnection(URL)) { + Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) { + stopGrid(i); + invalidateConnectionToStoppedNode(conn); + } + + assertConnectionsCount(ios, 0); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) + startGrid(i); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + } + } + + /** + * Test eager connection failover: + * <ol> + * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li> + * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone, + * until it tries to touch it) and verify, that connections count equals to zero. </li> + * <li>Wait for some time, in order for reconnection thread to increase delay between connection attempts, + * because of reconnection failures.</li> + * <li>Start, previously stopped nodes, and send simple query immediately. Eager reconnection is expected. + * <b>NOTE</b>:There's still a chance that connection would be recreated by background thread and not eager process. + * In order to decrease given possibility we've waited for some time on previous step.</li> + * <li>Ensure that after some time all connections will be restored.</li> + * </ol> + * + * @throws Exception If failed. + */ + @Test + public void testEagerConnectionFailover() throws Exception { + try(Connection conn = DriverManager.getConnection(URL)) { + Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) { + stopGrid(i); + invalidateConnectionToStoppedNode(conn); + } + + assertEquals("Unexpected connections count.", 0, ios.size()); + + doSleep(4 * JdbcThinConnection.RECONNECTION_DELAY); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) + startGrid(i); + + conn.createStatement().execute("select 1;"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + } + } + + /** + * Check that reconnection thread increases delay between unsuccessful connection attempts: + * <ol> + * <li>Specify two inet addresses one valid and one inoperative.</li> + * <li>Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection + * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds + * or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY} + * <pre> + * |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _| + * where: '|' is connection attempt; + * '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY; + * + * so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts: + * |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _| + * </pre> + * </li> + * <li>Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see + * four warning messages there.</li> + * </ol> + * + * @throws Exception If failed. + */ + @Test + public void testReconnectionDelayIncreasing() throws Exception { + Logger log = Logger.getLogger(JdbcThinConnection.class.getName()); + LogHandler hnd = new LogHandler(); + hnd.setLevel(Level.ALL); + log.setUseParentHandlers(false); + log.addHandler(hnd); + log.setLevel(Level.ALL); + + try (Connection ignored = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810?affinityAwareness=true")) { + hnd.records.clear(); + + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + assertEquals("Unexpected log records count.", 4, hnd.records.size()); + + String expRecordMsg = "Failed to connect to Ignite node " + + "[url=jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810]. address = [localhost/127.0.0.1:10810]."; + + for (LogRecord record: hnd.records) { + assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); + assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); + } + } + } + + /** + * Check that reconnection thread selectively increases delay between unsuccessful connection attempts: + * <ol> + * <li>Create {@code JdbcThinConnection} with two valid inet addresses.</li> + * <li>Stop one node and invalidate corresponding connection. Ensure that only one connection left.</li> + * <li>Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection + * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds + * or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY} + * <pre> + * |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _| + * where: '|' is connection attempt; + * '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY; + * + * so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts: + * |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _| + * </pre> + * </li> + * <li>Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see + * four warning messages there.</li> + * <li>Start previously stopped node.</li> + * <li>Wait until next reconnection attempt.</li> + * <li>Check that both connections are established and that there are no warning messages within logs.</li> + * <li>One more time: stop one node and invalidate corresponding connection. + * Ensure that only one connection left.</li> + * <li>Wait for some time.</li> + * <li>Ensure that delay between reconnection was reset to initial value. + * In other words, we again expect four warning messages within logs.</li> + * </ol> + * + * @throws Exception If failed. + */ + @Test + public void testReconnectionDelaySelectiveIncreasing() throws Exception { + Logger log = Logger.getLogger(JdbcThinConnection.class.getName()); + LogHandler hnd = new LogHandler(); + hnd.setLevel(Level.ALL); + log.setUseParentHandlers(false); + log.addHandler(hnd); + log.setLevel(Level.ALL); + + try (Connection conn = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800..10801?affinityAwareness=true")) { + // Stop one node and invalidate corresponding connection. Ensure that only one connection left. + stopGrid(0); + + invalidateConnectionToStoppedNode(conn); + + Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertEquals("Unexpected connections count.", 1, ios.size()); + + hnd.records.clear(); + + // Wait for some specific amount of time and ensure that there were exact four reconnection attempts. + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + assertEquals("Unexpected log records count.", 4, hnd.records.size()); + + String expRecordMsg = "Failed to connect to Ignite node [url=jdbc:ignite:thin://127.0.0.1:10800..10801]." + + " address = [localhost/127.0.0.1:10800]."; + + for (LogRecord record: hnd.records) { + assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); + assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); + } + + // Start previously stopped node. + startGrid(0); + + hnd.records.clear(); + + // Waiting until next reconnection attempt. + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + // Checking that both connections are established and that there are no warning messages within logs. + assertEquals("Unexpected log records count.", 0, hnd.records.size()); + + assertEquals("Unexpected connections count.", 2, ios.size()); + + // One more time: stop one node, invalidate corresponding connection and ensure that only one connection + // left. + stopGrid(0); + + invalidateConnectionToStoppedNode(conn); + + assertEquals("Unexpected connections count.", 1, ios.size()); + + hnd.records.clear(); + + // Wait for some time and ensure that delay between reconnection was reset to initial value. + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + assertEquals("Unexpected log records count.", 4, hnd.records.size()); + + for (LogRecord record: hnd.records) { + assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); + assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); + } + + startGrid(0); + } + } + + /** + * Assert connections count. + * + * @param ios Map that holds connections. + * @param expConnCnt Expected connections count. + */ + private void assertConnectionsCount(Map<UUID, JdbcThinTcpIo> ios, int expConnCnt) + throws IgniteInterruptedCheckedException { + boolean allConnectionsEstablished = GridTestUtils.waitForCondition(() -> ios.size() == expConnCnt, + 10_000); + + assertTrue("Unexpected connections count.", allConnectionsEstablished); + } + + /** + * Invalidate connection to stopped node. Jdbc thin, won't detect that node has gone, until it tries to touch it. + * So sending simple query to randomly chosen connection(socket), sooner or later, will touch dead one, + * and thus invalidate it. + * + * @param conn Connections. + */ + private void invalidateConnectionToStoppedNode(Connection conn) { + while (true) { + try (Statement stmt = conn.createStatement()) { + stmt.execute("select 1"); + } + catch (SQLException e) { + return; + } + } + } + + /** + * Simple {@code java.util.logging.Handler} implementation in order to check log records + * generated by {@code JdbcThinConnection}. + */ + static class LogHandler extends Handler { + + /** Log records. */ + private final List<LogRecord> records = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void publish(LogRecord record) { + records.add(record); + } + + /** {@inheritDoc} */ + @Override + public void close() { + } + + /** {@inheritDoc} */ + @Override + public void flush() { + } + + /** + * @return Records. + */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public List<LogRecord> records() { + return records; + } + } +} diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java index 22e5d0d..f0e632c 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java @@ -30,7 +30,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.stream.Collectors; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -133,14 +132,14 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest */ @Test public void testExecuteQueries() throws Exception { - checkNodesUsage(null, "select * from Person where _key = 1", 1, 1, + checkNodesUsage(null, stmt, "select * from Person where _key = 1", 1, 1, false); - checkNodesUsage(null, "select * from Person where _key = 1 or _key = 2", 2, + checkNodesUsage(null, stmt, "select * from Person where _key = 1 or _key = 2", 2, 2, false); - checkNodesUsage(null, "select * from Person where _key in (1, 2)", 2, 2, - false); + checkNodesUsage(null, stmt, "select * from Person where _key in (1, 2)", 2, + 2, false); } /** @@ -155,7 +154,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(1, 2); - checkNodesUsage(ps, null, 1, 1, false); + checkNodesUsage(ps, null, null, 1, 1, false); // Use case 2. ps = conn.prepareStatement("select * from Person where _key = ? or _key = ?"); @@ -164,7 +163,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(2, 2); - checkNodesUsage(ps, null, 2, 2, false); + checkNodesUsage(ps, null, null, 2, 2, false); // Use case 3. ps = conn.prepareStatement("select * from Person where _key in (?, ?)"); @@ -173,7 +172,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(2, 2); - checkNodesUsage(ps, null, 2, 2, false); + checkNodesUsage(ps, null, null, 2, 2, false); } /** @@ -183,13 +182,13 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest */ @Test public void testUpdateQueries() throws Exception { - checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key = 1", + checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key = 1", 1, 1, true); - checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key = 1 or _key = 2", + checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key = 1 or _key = 2", 2, 2, true); - checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key in (1, 2)", + checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key in (1, 2)", 2, 2, true); } @@ -206,7 +205,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(1, 2); - checkNodesUsage(ps, null, 1, 1, true); + checkNodesUsage(ps, null, null, 1, 1, true); // Use case 2. ps = conn.prepareStatement("update Person set firstName = 'TestFirstName' where _key = ? or _key = ?"); @@ -215,7 +214,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(2, 2); - checkNodesUsage(ps, null, 2, 2, true); + checkNodesUsage(ps, null, null, 2, 2, true); // Use case 3. ps = conn.prepareStatement("update Person set firstName = 'TestFirstName' where _key in (?, ?)"); @@ -224,7 +223,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(2, 2); - checkNodesUsage(ps, null, 2, 2, true); + checkNodesUsage(ps, null, null, 2, 2, true); } /** @@ -235,12 +234,12 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest @Test public void testDeleteQueries() throws Exception { // In case of simple query like "delete from Person where _key = 1" fast update logic is used, - // so parition result is not calculated on the server side - nothing to check. + // so partition result is not calculated on the server side - nothing to check. - checkNodesUsage(null, "delete from Person where _key = 10000 or _key = 20000", + checkNodesUsage(null, stmt, "delete from Person where _key = 10000 or _key = 20000", 2, 0, true); - checkNodesUsage(null, "delete from Person where _key in (10000, 20000)", + checkNodesUsage(null, stmt, "delete from Person where _key in (10000, 20000)", 2, 0, true); } @@ -252,7 +251,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest @Test public void testDeleteParametrizedQueries() throws Exception { // In case of simple query like "delete from Person where _key = ?" fast update logic is used, - // so parition result is not calculated on the server side - nothing to check. + // so partition result is not calculated on the server side - nothing to check. // Use case 1. PreparedStatement ps = conn.prepareStatement("delete from Person where _key = ? or _key = ?"); @@ -261,7 +260,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(2, 2000); - checkNodesUsage(ps, null, 2, 0, true); + checkNodesUsage(ps, null, null, 2, 0, true); // Use case 2. ps = conn.prepareStatement("delete from Person where _key in (?, ?)"); @@ -270,7 +269,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest ps.setInt(2, 2000); - checkNodesUsage(ps, null, 2, 0, true); + checkNodesUsage(ps, null, null, 2, 0, true); } /** @@ -352,14 +351,14 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest fillCache(cacheName); - checkNodesUsage(null, + checkNodesUsage(null, stmt, "select * from \"" + cacheName + "\".Person where _key = 1", 1, 1, false); } /** * Check that affinity cache is invalidated in case of changing topology, - * detected during partions destribution retrieval. + * detected during partitions distribution retrieval. * * @throws Exception If failed. */ @@ -483,7 +482,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest * @throws Exception If failed. */ @Test - public void testAffinityCacheStoresSchemaBindedQuries() throws Exception { + public void testAffinityCacheStoresSchemaBindedQueries() throws Exception { final String cacheName = "yacc"; CacheConfiguration<Object, Object> cache = prepareCacheConfig(cacheName); @@ -515,12 +514,12 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest } /** - * Check that affinity cache stores compacted version of partitoins destributions. + * Check that affinity cache stores compacted version of partitions distributions. * * @throws Exception If failed. */ @Test - public void testAffinityCacheCompactsPartitonDestributions() throws Exception { + public void testAffinityCacheCompactsPartitionDistributions() throws Exception { final String cacheName = "yaccc"; CacheConfiguration<Object, Object> cache = prepareCacheConfig(cacheName); @@ -546,56 +545,16 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest assertEquals("Sql sub-cache of affinity cache has unexpected number of elements.", 2, sqlCache.size()); - assertEquals("Partitions destribution sub-cache of affinity cache has unexpected number of elements.", + assertEquals("Partitions distribution sub-cache of affinity cache has unexpected number of elements.", 2, cachePartitionsDistribution.size()); - // Main assertition of the test: we are checking that partitions destributions for different caches + // Main assertion of the test: we are checking that partitions distributions for different caches // are equal in therms of (==) assertTrue("Partitions distributions are not the same.", cachePartitionsDistribution.get(0) == cachePartitionsDistribution.get(1)); } /** - * Check that affinity awareness works fine after reconnection. - * - * @throws Exception If failed. - */ - @Test - public void testReconnect() throws Exception { - checkNodesUsage(null, "select * from Person where _key = 3", 1, 1, - false); - - startGrid(7); - - for(int i = 0; i < NODES_CNT; i++) - stopGrid(i); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - stmt.execute("select * from Person where _key = 3"); - - return null; - } - }, SQLException.class, "Failed to communicate with Ignite cluster."); - - for(int i = 0; i < NODES_CNT; i++) - startGrid(i); - - stopGrid(4); - stopGrid(5); - stopGrid(6); - stopGrid(7); - - stmt = conn.createStatement(); - - // We need this extra query to invalidate obsolete affinity cache - stmt.execute("select * from Person where _key = 3"); - - checkNodesUsage(null, "select * from Person where _key = 3", 1, 1, - false); - } - - /** * Prepares default cache configuration with given name. * * @param cacheName Cache name. @@ -607,6 +566,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest cache.setName(cacheName); cache.setCacheMode(PARTITIONED); + cache.setBackups(1); cache.setIndexedTypes( Integer.class, Person.class ); @@ -615,8 +575,8 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest } /** - * Utitlity method that executes given query and verifies that expeted number of records was returned. - * Besides that given method verified that partitoin result for corresponding query is null. + * Utility method that executes given query and verifies that expected number of records was returned. + * Besides that given method verified that partition result for corresponding query is null. * * @param sqlQry Sql query. * @param expRowsCnt Expected rows count. @@ -656,8 +616,8 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest * @param dml Flag that signals whether we execute dml or not. * @throws Exception If failed. */ - private void checkNodesUsage(PreparedStatement ps, String sql, int maxNodesUsedCnt, int expRowsCnt, boolean dml) - throws Exception { + private void checkNodesUsage(PreparedStatement ps, Statement stmt, String sql, int maxNodesUsedCnt, int expRowsCnt, + boolean dml) throws Exception { // Warm up an affinity cache. if (ps != null) if (dml) @@ -729,7 +689,7 @@ public class JdbcThinAffinityAwarenessSelfTest extends JdbcThinAbstractSelfTest "], got [" + nonEmptyMetricsCntr + "]", nonEmptyMetricsCntr > 0 && nonEmptyMetricsCntr <= maxNodesUsedCnt); - assertEquals("Executions count doesn't match expeted value: expected [" + + assertEquals("Executions count doesn't match expected value: expected [" + NODES_CNT * QUERY_EXECUTION_MULTIPLIER + "], got [" + qryExecutionsCntr + "]", NODES_CNT * QUERY_EXECUTION_MULTIPLIER, qryExecutionsCntr); } diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index 243f5c4..185fa91 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -286,8 +286,9 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { */ @Test public void testSqlHints() throws Exception { - try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { - assertHints(conn, false, false, false, false, false, false, affinityAwareness); + try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag)) { + assertHints(conn, false, false, false, false, false, + false, affinityAwareness); } try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + "&distributedJoins=true")) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java index d582ede..bd4dc4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java @@ -107,7 +107,7 @@ public final class AffinityCache { * @param cacheId Cache Id. * @return Cache partitoins distribution for given cache Id or null. */ - UUID[] cacheDistribution(int cacheId) { + public UUID[] cacheDistribution(int cacheId) { return cachePartitionsDistribution.get(cacheId); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index 065bac9..971acdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -43,22 +43,27 @@ import java.sql.Struct; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.jdbc2.JdbcUtils; @@ -107,6 +112,12 @@ public class JdbcThinConnection implements Connection { /** Request timeout period. */ private static final int REQUEST_TIMEOUT_PERIOD = 1_000; + /** Reconnection period. */ + public static final int RECONNECTION_DELAY = 200; + + /** Reconnection maximum period. */ + private static final int RECONNECTION_MAX_DELAY = 300_000; + /** Network timeout permission */ private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout"; @@ -149,15 +160,12 @@ public class JdbcThinConnection implements Connection { /** Connection properties. */ private final ConnectionProperties connProps; - /** Connected. */ - private volatile boolean connected; + /** The amount of potentially alive {@code JdbcThinTcpIo} instances - connections to server nodes. */ + private final AtomicInteger connCnt = new AtomicInteger(); /** Tracked statements to close on disconnect. */ private final Set<JdbcThinStatement> stmts = Collections.newSetFromMap(new IdentityHashMap<>()); - /** Query timeout timer */ - private final Timer timer; - /** Affinity cache. */ private AffinityCache affinityCache; @@ -165,10 +173,7 @@ public class JdbcThinConnection implements Connection { private volatile JdbcThinTcpIo singleIo; /** Node Ids tp ignite endpoints. */ - private final Map<UUID, JdbcThinTcpIo> ios = new ConcurrentHashMap<>(); - - /** Ignite endpoints to use for better performance in case of random access. */ - private JdbcThinTcpIo[] iosArr; + private final ConcurrentSkipListMap<UUID, JdbcThinTcpIo> ios = new ConcurrentSkipListMap<>(); /** Server index. */ private int srvIdx; @@ -188,6 +193,15 @@ public class JdbcThinConnection implements Connection { /** Network timeout. */ private int netTimeout; + /** Background periodical maintenance: query timeouts and reconnection handler. */ + private final ScheduledExecutorService maintenanceExecutor = Executors.newScheduledThreadPool(2); + + /** Cancelable future for query timeout task. */ + private ScheduledFuture<?> qryTimeoutScheduledFut; + + /** Cancelable future for connections handler task. */ + private ScheduledFuture<?> connectionsHndScheduledFut; + /** * Creates new connection. * @@ -203,32 +217,30 @@ public class JdbcThinConnection implements Connection { schema = JdbcUtils.normalizeSchema(connProps.getSchema()); - timer = new Timer("query-timeout-timer"); - affinityAwareness = connProps.isAffinityAwareness(); ensureConnected(); + + if (affinityAwareness) + connectionsHndScheduledFut = maintenanceExecutor.scheduleWithFixedDelay(new ConnectionHandlerTask(), + 0, RECONNECTION_DELAY, TimeUnit.MILLISECONDS); } /** * @throws SQLException On connection error. */ private void ensureConnected() throws SQLException { - if (connected) + if (connCnt.get() > 0) return; assert !closed; assert ios.isEmpty(); - assert iosArr == null; - - HostAndPortRange[] srvs = connProps.getAddresses(); - if (affinityAwareness) - connectInAffinityAwarenessMode(srvs); + connectInBestEffortAffinityMode(); else - connectInCommonMode(srvs); + connectInCommonMode(); } /** @@ -445,6 +457,10 @@ public class JdbcThinConnection implements Connection { if (isClosed()) return; + closed = true; + + maintenanceExecutor.shutdown(); + if (streamState != null) { streamState.close(); @@ -457,23 +473,17 @@ public class JdbcThinConnection implements Connection { SQLException err = null; - closed = true; - if (affinityAwareness) { for (JdbcThinTcpIo clioIo : ios.values()) clioIo.close(); ios.clear(); - - iosArr = null; } else { if (singleIo != null) singleIo.close(); } - timer.cancel(); - if (err != null) throw err; } @@ -858,7 +868,7 @@ public class JdbcThinConnection implements Connection { throws SQLException { ensureConnected(); - RequestTimeoutTimerTask reqTimeoutTimerTask = null; + RequestTimeoutTask reqTimeoutTask = null; synchronized (mux) { if (ownThread != null) { @@ -870,16 +880,18 @@ public class JdbcThinConnection implements Connection { ownThread = Thread.currentThread(); } try { + JdbcThinTcpIo cliIo = null; try { - JdbcThinTcpIo cliIo = stickyIo == null ? cliIo(calculateNodeIds(req)) : stickyIo; + cliIo = (stickyIo == null || !stickyIo.connected()) ? cliIo(calculateNodeIds(req)) : stickyIo; if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) { - reqTimeoutTimerTask = new RequestTimeoutTimerTask( + reqTimeoutTask = new RequestTimeoutTask( req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(), cliIo, stmt.requestTimeout()); - timer.schedule(reqTimeoutTimerTask, 0, REQUEST_TIMEOUT_PERIOD); + qryTimeoutScheduledFut = maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0, + REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS); } JdbcQueryExecuteRequest qryReq = null; @@ -892,13 +904,15 @@ public class JdbcThinConnection implements Connection { txIo = res.activeTransaction() ? cliIo : null; if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null && - stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null && reqTimeoutTimerTask.expired.get()) { + stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null && + reqTimeoutTask.expired.get()) { throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED, IgniteQueryErrorCode.QUERY_CANCELED); } else if (res.status() != ClientListenerResponse.STATUS_SUCCESS) - throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), res.status()); + throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), + res.status()); updateAffinityCache(qryReq, res); @@ -908,16 +922,17 @@ public class JdbcThinConnection implements Connection { throw e; } catch (Exception e) { - onDisconnect(); + onDisconnect(cliIo); if (e instanceof SocketTimeoutException) throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e); else - throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); + throw new SQLException("Failed to communicate with Ignite cluster.", + SqlStateCode.CONNECTION_FAILURE, e); } finally { - if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null) - reqTimeoutTimerTask.cancel(); + if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null) + qryTimeoutScheduledFut.cancel(false); } } finally { @@ -932,7 +947,7 @@ public class JdbcThinConnection implements Connection { * * @param req Jdbc request for which we'll try to calculate node id. * @return node UUID or null if failed to calculate. - * @throws IOException If Exception occured during the network partiton destribution retrieval. + * @throws IOException If Exception occurred during the network partition distribution retrieval. * @throws SQLException If Failed to calculate derived partitions. */ @Nullable private List<UUID> calculateNodeIds(JdbcRequest req) throws IOException, SQLException { @@ -982,12 +997,12 @@ public class JdbcThinConnection implements Connection { } /** - * Retrieve cache destribution for specified cache Id. + * Retrieve cache distribution for specified cache Id. * * @param cacheId Cache Id. - * @param partCnt Partitons count. + * @param partCnt Partitions count. * @return Partitions cache distribution. - * @throws IOException If Exception occured during the network partiton destribution retrieval. + * @throws IOException If Exception occurred during the network partition distribution retrieval. */ private UUID[] retrieveCacheDistribution(int cacheId, int partCnt) throws IOException { UUID[] cacheDistr = affinityCache.cacheDistribution(cacheId); @@ -997,7 +1012,8 @@ public class JdbcThinConnection implements Connection { JdbcResponse res; - res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)), null); + res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)), + null); assert res.status() == ClientListenerResponse.STATUS_SUCCESS; @@ -1007,7 +1023,7 @@ public class JdbcThinConnection implements Connection { affinityCache = new AffinityCache(resAffinityVer); else if (affinityCache.version().compareTo(resAffinityVer) > 0) { // Jdbc thin affinity cache is binded to the newer affinity topology version, so we should ignore retrieved - // partition destribution. Given situation might occur in case of concurrent race and is not + // partition distribution. Given situation might occur in case of concurrent race and is not // possible in single-threaded jdbc thin client, so it's a reserve for the future. return null; } @@ -1015,7 +1031,7 @@ public class JdbcThinConnection implements Connection { List<JdbcThinAffinityAwarenessMappingGroup> mappings = ((JdbcCachePartitionsResult)res.response()).getMappings(); - // Despite the fact that, at this moment, we request partition destribution only for one cache, + // Despite the fact that, at this moment, we request partition distribution only for one cache, // we might retrieve multiple caches but exactly with same distribution. assert mappings.size() == 1; @@ -1046,7 +1062,8 @@ public class JdbcThinConnection implements Connection { return derivedParts.tree().apply(partResDesc.partitionClientContext(), args); } catch (IgniteCheckedException e) { - throw new SQLException("Failed to calculate derived partitions for query.", SqlStateCode.INTERNAL_ERROR); + throw new SQLException("Failed to calculate derived partitions for query.", + SqlStateCode.INTERNAL_ERROR); } } @@ -1061,7 +1078,7 @@ public class JdbcThinConnection implements Connection { * @throws SQLException On any error. */ void sendQueryCancelRequest(JdbcQueryCancelRequest req, JdbcThinTcpIo cliIo) throws SQLException { - if (!connected) + if (connCnt.get() == 0) throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE); assert cliIo != null; @@ -1082,7 +1099,8 @@ public class JdbcThinConnection implements Connection { * @param stickyIO Sticky ignite endpoint. * @throws SQLException On any error. */ - private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO) throws SQLException { + private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO) + throws SQLException { ensureConnected(); synchronized (mux) { @@ -1102,12 +1120,13 @@ public class JdbcThinConnection implements Connection { throw e; } catch (Exception e) { - onDisconnect(); + onDisconnect(stickyIO); if (e instanceof SocketTimeoutException) throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e); else - throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); + throw new SQLException("Failed to communicate with Ignite cluster.", + SqlStateCode.CONNECTION_FAILURE, e); } finally { synchronized (mux) { @@ -1126,24 +1145,20 @@ public class JdbcThinConnection implements Connection { /** * Called on IO disconnect: close the client IO and opened statements. */ - private void onDisconnect() { - if (!connected) - return; + private void onDisconnect(JdbcThinTcpIo cliIo) { + assert connCnt.get() > 0; if (affinityAwareness) { - for (JdbcThinTcpIo clioIo : ios.values()) - clioIo.close(); - - ios.clear(); + cliIo.close(); - iosArr = null; + ios.remove(cliIo.nodeId()); } else { if (singleIo != null) singleIo.close(); } - connected = false; + connCnt.decrementAndGet(); if (streamState != null) { streamState.close0(); @@ -1157,8 +1172,6 @@ public class JdbcThinConnection implements Connection { stmts.clear(); } - - timer.cancel(); } /** @@ -1308,7 +1321,7 @@ public class JdbcThinConnection implements Connection { if (err0 instanceof SQLException) throw (SQLException)err0; else { - onDisconnect(); + onDisconnect(streamingStickyIo); if (err0 instanceof SocketTimeoutException) throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, err0); @@ -1330,7 +1343,7 @@ public class JdbcThinConnection implements Connection { /** */ void close0() { - if (connected) { + if (connCnt.get() > 0) { try { executeBatch(true); } @@ -1395,7 +1408,6 @@ public class JdbcThinConnection implements Connection { * @param nodeIds Set of node's UUIDs. * @return Ignite endpoint to use for request/response transferring. */ - @SuppressWarnings("ZeroLengthArrayAllocation") private JdbcThinTcpIo cliIo(List<UUID> nodeIds) { if (!affinityAwareness) return singleIo; @@ -1404,12 +1416,12 @@ public class JdbcThinConnection implements Connection { return txIo; if (nodeIds == null || nodeIds.isEmpty()) - return iosArr[RND.nextInt(iosArr.length)]; + return randomIo(); JdbcThinTcpIo io = null; if (nodeIds.size() == 1) - io = ios.get(nodeIds.iterator().next()); + io = ios.get(nodeIds.get(0)); else { int initNodeId = RND.nextInt(nodeIds.size()); @@ -1427,7 +1439,42 @@ public class JdbcThinConnection implements Connection { } } - return io != null ? io : iosArr[RND.nextInt(iosArr.length)]; + return io != null ? io : randomIo(); + } + + /** + * Returns random tcpIo, based on random UUID, generated in a custom way with the help of {@code Random} + * instead of {@code SecureRandom}. It's valid, cause cryptographically strong pseudo + * random number generator is not required in this particular case. {@code Random} is much faster + * than {@code SecureRandom}. + * + * @return random tcpIo + */ + private JdbcThinTcpIo randomIo() { + byte[] randomBytes = new byte[16]; + + RND.nextBytes(randomBytes); + + randomBytes[6] &= 0x0f; /* clear version */ + randomBytes[6] |= 0x40; /* set to version 4 */ + randomBytes[8] &= 0x3f; /* clear variant */ + randomBytes[8] |= 0x80; /* set to IETF variant */ + + long msb = 0; + + long lsb = 0; + + for (int i=0; i<8; i++) + msb = (msb << 8) | (randomBytes[i] & 0xff); + + for (int i=8; i<16; i++) + lsb = (lsb << 8) | (randomBytes[i] & 0xff); + + UUID randomUUID = new UUID(msb, lsb); + + Map.Entry<UUID, JdbcThinTcpIo> entry = ios.ceilingEntry(randomUUID); + + return entry != null ? entry.getValue() : ios.floorEntry(randomUUID).getValue(); } /** @@ -1457,10 +1504,11 @@ public class JdbcThinConnection implements Connection { * Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one. * Stops as soon as any connection is established. * - * @param srvs Ignite endpoints addresses. * @throws SQLException If failed to connect to ignite cluster. */ - private void connectInCommonMode(HostAndPortRange[] srvs) throws SQLException { + private void connectInCommonMode() throws SQLException { + HostAndPortRange[] srvs = connProps.getAddresses(); + List<Exception> exceptions = null; for (int i = 0; i < srvs.length; i++) { @@ -1481,7 +1529,7 @@ public class JdbcThinConnection implements Connection { singleIo = cliIo; - connected = true; + connCnt.incrementAndGet(); return; } @@ -1513,7 +1561,7 @@ public class JdbcThinConnection implements Connection { * @throws SQLException Umbrella exception. */ private void handleConnectExceptions(List<Exception> exceptions) throws SQLException { - if (!connected && exceptions != null) { + if (connCnt.get() == 0 && exceptions != null) { close(); if (exceptions.size() == 1) { @@ -1540,18 +1588,16 @@ public class JdbcThinConnection implements Connection { * Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one. * Stops as soon as all iosArr are established. * - * @param srvs Ignite endpoints addresses. * @throws SQLException If failed to connect to at least one ignite endpoint, * or if endpoints versions are not the same. */ - @SuppressWarnings("ZeroLengthArrayAllocation") - private void connectInAffinityAwarenessMode(HostAndPortRange[] srvs) throws SQLException { + private void connectInBestEffortAffinityMode() throws SQLException { List<Exception> exceptions = null; - IgniteProductVersion prevIgniteEnpointVer = null; + IgniteProductVersion prevIgniteEndpointVer = null; - for (int i = 0; i < srvs.length; i++) { - HostAndPortRange srv = srvs[i]; + for (int i = 0; i < connProps.getAddresses().length; i++) { + HostAndPortRange srv = connProps.getAddresses()[i]; try { InetAddress[] addrs = InetAddress.getAllByName(srv.host()); @@ -1563,14 +1609,18 @@ public class JdbcThinConnection implements Connection { new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0); if (!cliIo.isAffinityAwarenessSupported()) { + cliIo.close(); + throw new SQLException("Failed to connect to Ignite node [url=" + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + - "Node doesn't support best affort affinity mode.", + "Node doesn't support affinity awareness mode.", SqlStateCode.INTERNAL_ERROR); } - if (prevIgniteEnpointVer != null && !prevIgniteEnpointVer.equals(cliIo.igniteVersion())) { + if (prevIgniteEndpointVer != null && !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) { // TODO: 13.02.19 IGNITE-11321 JDBC Thin: implement nodes multi version support. + cliIo.close(); + throw new SQLException("Failed to connect to Ignite node [url=" + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + "Different versions of nodes are not supported in affinity awareness mode.", @@ -1579,17 +1629,18 @@ public class JdbcThinConnection implements Connection { cliIo.timeout(netTimeout); - JdbcThinTcpIo ioToSameNode = ios.get(cliIo.nodeId()); + JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo); - // This can happen if the same node has several IPs. + // This can happen if the same node has several IPs or if connection manager background + // timer task runs concurrently. if (ioToSameNode != null) - ioToSameNode.close(); - - ios.put(cliIo.nodeId(), cliIo); + cliIo.close(); + else + connCnt.incrementAndGet(); - connected = true; + prevIgniteEndpointVer = cliIo.igniteVersion(); - prevIgniteEnpointVer = cliIo.igniteVersion(); + return; } catch (Exception exception) { if (exceptions == null) @@ -1609,14 +1660,49 @@ public class JdbcThinConnection implements Connection { } handleConnectExceptions(exceptions); + } + + /** + * Recreates affinity cache if affinity topology version was changed and adds partition result to sql cache. + * + * @param qryReq Query request. + * @param res Jdbc Response. + */ + private void updateAffinityCache(JdbcQueryExecuteRequest qryReq, JdbcResponse res) { + if (affinityAwareness) { + AffinityTopologyVersion resAffVer = res.affinityVersion(); + + if (resAffVer != null && (affinityCache == null || affinityCache.version().compareTo(resAffVer) < 0)) + affinityCache = new AffinityCache(resAffVer); + + // Partition result was requested. + if (res.response() instanceof JdbcQueryExecuteResult && qryReq.partitionResponseRequest()) { + PartitionResult partRes = ((JdbcQueryExecuteResult)res.response()).partitionResult(); + + if (partRes == null || affinityCache.version().equals(partRes.topologyVersion())) { + int cacheId = (partRes != null && partRes.tree() != null) ? + GridCacheUtils.cacheId(partRes.cacheName()) : + -1; + + PartitionClientContext partClientCtx = partRes != null ? + new PartitionClientContext(partRes.partitionsCount()) : + null; + + QualifiedSQLQuery qry = new QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery()); - iosArr = ios.values().toArray(new JdbcThinTcpIo[0]); + JdbcThinPartitionResultDescriptor partResDescr = + new JdbcThinPartitionResultDescriptor(partRes, cacheId, partClientCtx); + + affinityCache.addSqlQuery(qry, partResDescr); + } + } + } } /** - * Request Timeout Timer Task + * Request Timeout Task */ - private class RequestTimeoutTimerTask extends TimerTask { + private class RequestTimeoutTask implements Runnable { /** Request id. */ private final long reqId; @@ -1633,7 +1719,7 @@ public class JdbcThinConnection implements Connection { * @param reqId Request Id to cancel in case of timeout * @param initReqTimeout Initial request timeout */ - RequestTimeoutTimerTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) { + RequestTimeoutTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) { this.reqId = reqId; this.stickyIO = stickyIO; @@ -1651,7 +1737,9 @@ public class JdbcThinConnection implements Connection { sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId), stickyIO); - cancel(); + qryTimeoutScheduledFut.cancel(false); + + return; } remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD; @@ -1660,45 +1748,155 @@ public class JdbcThinConnection implements Connection { LOG.log(Level.WARNING, "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e); - cancel(); + qryTimeoutScheduledFut.cancel(false); } } } /** - * Recreates affinity cache if affinity topology version was changed and adds partition result to sql cache. - * - * @param qryReq Query request. - * @param res Jdbc Response. + * Connection Handler Task */ - private void updateAffinityCache(JdbcQueryExecuteRequest qryReq, JdbcResponse res) { - if (affinityAwareness) { - AffinityTopologyVersion resAffVer = res.affinityVersion(); + private class ConnectionHandlerTask implements Runnable { + /** Map with reconnection delays. */ + private Map<InetSocketAddress, Integer> reconnectionDelays = new HashMap<>(); - if (resAffVer != null && (affinityCache == null || affinityCache.version().compareTo(resAffVer) < 0)) - affinityCache = new AffinityCache(resAffVer); + /** Map with reconnection delays remainder. */ + private Map<InetSocketAddress, Integer> reconnectionDelaysRemainder = new HashMap<>(); - // Partition result was requested. - if (res.response() instanceof JdbcQueryExecuteResult && qryReq.partitionResponseRequest()) { - PartitionResult partRes = ((JdbcQueryExecuteResult)res.response()).partitionResult(); + /** {@inheritDoc} */ + @Override public void run() { + try { + for (Map.Entry<InetSocketAddress, Integer> delayEntry : reconnectionDelaysRemainder.entrySet()) + reconnectionDelaysRemainder.put(delayEntry.getKey(), delayEntry.getValue() - RECONNECTION_DELAY); - if (partRes == null || affinityCache.version().equals(partRes.topologyVersion())) { - int cacheId = (partRes != null && partRes.tree() != null) ? - GridCacheUtils.cacheId(partRes.cacheName()) : - -1; + Set<InetSocketAddress> aliveSockAddrs = + ios.values().stream().map(JdbcThinTcpIo::socketAddress).collect(Collectors.toSet()); - PartitionClientContext partClientCtx = partRes != null ? - new PartitionClientContext(partRes.partitionsCount()) : - null; + IgniteProductVersion prevIgniteEndpointVer = null; - QualifiedSQLQuery qry = new QualifiedSQLQuery(qryReq.schemaName(), qryReq.sqlQuery()); + for (int i = 0; i < connProps.getAddresses().length; i++) { + HostAndPortRange srv = connProps.getAddresses()[i]; - JdbcThinPartitionResultDescriptor partResDescr = - new JdbcThinPartitionResultDescriptor(partRes, cacheId, partClientCtx); + try { + InetAddress[] addrs = InetAddress.getAllByName(srv.host()); - affinityCache.addSqlQuery(qry, partResDescr); + for (InetAddress addr : addrs) { + for (int port = srv.portFrom(); port <= srv.portTo(); ++port) { + InetSocketAddress sockAddr = null; + + try { + sockAddr = new InetSocketAddress(addr, port); + + if (aliveSockAddrs.contains(sockAddr)) { + reconnectionDelaysRemainder.remove(sockAddr); + reconnectionDelays.remove(sockAddr); + + continue; + } + + Integer delayRemainder = reconnectionDelaysRemainder.get(sockAddr); + + if (delayRemainder != null && delayRemainder != 0) + continue; + + if (closed) { + maintenanceExecutor.shutdown(); + + return; + } + + JdbcThinTcpIo cliIo = + new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0); + + if (!cliIo.isAffinityAwarenessSupported()) { + processDelay(sockAddr); + + LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + + "Node doesn't support best effort affinity mode."); + + cliIo.close(); + + continue; + } + + if (prevIgniteEndpointVer != null && + !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) { + processDelay(sockAddr); + + LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + + "Different versions of nodes are not supported in best " + + "effort affinity mode."); + + cliIo.close(); + + continue; + } + + cliIo.timeout(netTimeout); + + JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo); + + // This can happen if the same node has several IPs or if ensureConnected() runs + // concurrently + if (ioToSameNode != null) + cliIo.close(); + else + connCnt.incrementAndGet(); + + prevIgniteEndpointVer = cliIo.igniteVersion(); + + if (closed) { + maintenanceExecutor.shutdown(); + + cliIo.close(); + + ios.remove(cliIo.nodeId()); + + return; + } + } + catch (Exception exception) { + if (sockAddr != null) + processDelay(sockAddr); + + LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]."); + } + } + } + } + catch (Exception exception) { + LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" + + connProps.getUrl() + "]. server = [" + srv + "]."); + } } } + catch (Exception e) { + LOG.log(Level.WARNING, "Connection handler processing failure. Reconnection processes was stopped." + , e); + + connectionsHndScheduledFut.cancel(false); + } + } + + /** + * Increase reconnection delay if needed and store it to corresponding maps. + * + * @param sockAddr Socket address. + */ + private void processDelay(InetSocketAddress sockAddr) { + Integer delay = reconnectionDelays.get(sockAddr); + + delay = delay == null ? RECONNECTION_DELAY : delay * 2; + + if (delay > RECONNECTION_MAX_DELAY) + delay = RECONNECTION_MAX_DELAY; + + reconnectionDelays.put(sockAddr, delay); + + reconnectionDelaysRemainder.put(sockAddr, delay); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 366be79..7663a80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -122,7 +122,7 @@ public class JdbcThinTcpIo { private final BufferedInputStream in; /** Connected flag. */ - private boolean connected; + private volatile boolean connected; /** Ignite server version. */ private final IgniteProductVersion igniteVer; @@ -422,10 +422,9 @@ public class JdbcThinTcpIo { JdbcResponse resp = readResponse(); - if (stmt != null && stmt.isCancelled()) - return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG); - else - return resp; + return stmt != null && stmt.isCancelled() ? + new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG) : + resp; } /** @@ -650,4 +649,18 @@ public class JdbcThinTcpIo { public UUID nodeId() { return nodeId; } + + /** + * @return Socket address. + */ + public InetSocketAddress socketAddress() { + return sockAddr; + } + + /** + * @return Connected flag. + */ + public boolean connected() { + return connected; + } }
