Repository: ignite Updated Branches: refs/heads/master 1a9c942b4 -> 898781aa0
IGNITE-5373: JDBC: added socket send buffer size, receive buffer size, tcp NO_DELAY flag to connection string. This closes #2076. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/898781aa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/898781aa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/898781aa Branch: refs/heads/master Commit: 898781aa03d9549f0e785b64770bdc1363d82d49 Parents: 1a9c942 Author: devozerov <[email protected]> Authored: Sat Jun 3 22:56:53 2017 +0300 Committer: devozerov <[email protected]> Committed: Sat Jun 3 22:56:53 2017 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcDynamicIndexAbstractSelfTest.java | 2 + .../jdbc/thin/JdbcThinConnectionSelfTest.java | 201 +++++++++++-------- .../JdbcThinDynamicIndexAbstractSelfTest.java | 2 + .../jdbc/thin/JdbcThinNoDefaultSchemaTest.java | 13 +- .../org/apache/ignite/IgniteJdbcThinDriver.java | 131 ++++-------- .../internal/jdbc/thin/JdbcThinConnection.java | 160 ++++++++++++--- .../internal/jdbc/thin/JdbcThinResultSet.java | 6 +- .../internal/jdbc/thin/JdbcThinStatement.java | 2 +- .../internal/jdbc/thin/JdbcThinTcpIo.java | 98 +++++++-- .../internal/jdbc/thin/JdbcThinUtils.java | 58 +++++- 10 files changed, 439 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java index 7bbda6f..d4da1f3 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java @@ -242,6 +242,8 @@ public abstract class JdbcDynamicIndexAbstractSelfTest extends JdbcAbstractDmlSt * Test that changes in cache affect index, and vice versa. */ public void testIndexState() throws SQLException { + fail("https://issues.apache.org/jira/browse/IGNITE-5373"); + IgniteCache<String, Person> cache = cache(); assertSize(3); http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index c3ce73a..8407a15 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -17,24 +17,23 @@ package org.apache.ignite.jdbc.thin; -import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.concurrent.Callable; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection; +import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo; +import org.apache.ignite.internal.jdbc.thin.JdbcThinUtils; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.NotNull; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.concurrent.Callable; + /** * Connection test. */ @@ -42,12 +41,6 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - /** URL prefix. */ - private static final String URL_PREFIX = "jdbc:ignite:thin://"; - - /** Host. */ - private static final String HOST = "127.0.0.1"; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -93,122 +86,152 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { /** * @throws Exception If failed. */ + @SuppressWarnings({"EmptyTryBlock", "unused"}) public void testDefaults() throws Exception { - String url = URL_PREFIX + HOST; + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + // No-op. + } - assert DriverManager.getConnection(url) != null; - assert DriverManager.getConnection(url + "/") != null; + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) { + // No-op. + } } /** + * Test invalid endpoint. + * * @throws Exception If failed. */ - public void testFailedHandshake() throws Exception { - final ServerSocket srvSock = new ServerSocket(60000, 0, InetAddress.getByName("127.0.0.1")); - - IgniteInternalFuture f = GridTestUtils.runAsync(new Runnable() { - @Override public void run() { - try { - Socket s = srvSock.accept(); - - s.close(); - } - catch (IOException e) { - log.error("Unexpected exception", e); - fail(); - } - } - }); + public void testInvalidEndpoint() throws Exception { + assertInvalid("jdbc:ignite:thin://", "Host name is empty"); + assertInvalid("jdbc:ignite:thin://:10000", "Host name is empty"); + assertInvalid("jdbc:ignite:thin:// :10000", "Host name is empty"); + + assertInvalid("jdbc:ignite:thin://127.0.0.1:-1", "Invalid port"); + assertInvalid("jdbc:ignite:thin://127.0.0.1:0", "Invalid port"); + assertInvalid("jdbc:ignite:thin://127.0.0.1:100000", "Invalid port"); + } + + /** + * Test invalid socket buffer sizes. + * + * @throws Exception If failed. + */ + public void testSocketBuffers() throws Exception { + assertInvalid("jdbc:ignite:thin://127.0.0.1?socketSendBuffer=-1", + "Property cannot be negative [name=" + JdbcThinUtils.PARAM_SOCK_SND_BUF); + + assertInvalid("jdbc:ignite:thin://127.0.0.1?socketReceiveBuffer=-1", + "Property cannot be negative [name=" + JdbcThinUtils.PARAM_SOCK_RCV_BUF); + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + assertEquals(0, socket(conn).socketSendBuffer()); + assertEquals(0, socket(conn).socketReceiveBuffer()); + } - try { - GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() { - @Override public Void call() throws Exception { - DriverManager.getConnection(URL_PREFIX + "127.0.0.1:60000"); + // Note that SO_* options are hints, so we check that value is equals to either what we set or to default. + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?socketSendBuffer=1024")) { + assertEquals(1024, socket(conn).socketSendBuffer()); + assertEquals(0, socket(conn).socketReceiveBuffer()); + } - return null; - } - }, SQLException.class, "Failed to connect to Ignite cluster [host=127.0.0.1, port=60000]"); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?socketReceiveBuffer=1024")) { + assertEquals(0, socket(conn).socketSendBuffer()); + assertEquals(1024, socket(conn).socketReceiveBuffer()); } - finally { - f.get(3000); - srvSock.close(); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?" + + "socketSendBuffer=1024&socketReceiveBuffer=2048")) { + assertEquals(1024, socket(conn).socketSendBuffer()); + assertEquals(2048, socket(conn).socketReceiveBuffer()); } } /** + * Test TCP no delay property handling. + * * @throws Exception If failed. */ - public void testInvalidUrls() throws Exception { - GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() { - @Override public Void call() throws Exception { - DriverManager.getConnection("q"); + public void testPropertyTcpNoDelay() throws Exception { + assertInvalid("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=0", + "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_TCP_NO_DELAY); - return null; - } - }, SQLException.class, "No suitable driver found for q"); + assertInvalid("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=1", + "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_TCP_NO_DELAY); - GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() { - @Override public Void call() throws Exception { - DriverManager.getConnection(URL_PREFIX + "127.0.0.1:-1"); + assertInvalid("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=false1", + "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_TCP_NO_DELAY); - return null; - } - }, SQLException.class, "Invalid port:"); + assertInvalid("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=true1", + "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_TCP_NO_DELAY); - GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() { - @Override public Void call() throws Exception { - DriverManager.getConnection(URL_PREFIX + "127.0.0.1:0"); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + assertTrue(socket(conn).tcpNoDelay()); + } - return null; - } - }, SQLException.class, "Invalid port:"); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=true")) { + assertTrue(socket(conn).tcpNoDelay()); + } - GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() { - @Override public Void call() throws Exception { - DriverManager.getConnection(URL_PREFIX + "127.0.0.1:100000"); + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=false")) { + assertFalse(socket(conn).tcpNoDelay()); + } + } - return null; - } - }, SQLException.class, "Invalid port:"); + /** + * Get client socket for connection. + * + * @param conn Connection. + * @return Socket. + * @throws Exception If failed. + */ + private static JdbcThinTcpIo socket(Connection conn) throws Exception { + JdbcThinConnection conn0 = conn.unwrap(JdbcThinConnection.class); + return conn0.io(); + } + + /** + * Assert that provided URL is invalid. + * + * @param url URL. + * @param errMsg Error message. + */ + @SuppressWarnings("ThrowableNotThrown") + private void assertInvalid(final String url, String errMsg) { GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() { @Override public Void call() throws Exception { - DriverManager.getConnection(URL_PREFIX + " :10000"); + DriverManager.getConnection(url); return null; } - }, SQLException.class, "Host name is empty"); + }, SQLException.class, errMsg); } /** * @throws Exception If failed. */ + @SuppressWarnings("ThrowableNotThrown") public void testClose() throws Exception { - String url = URL_PREFIX + HOST; - - final Connection conn = DriverManager.getConnection(url); + final Connection conn; - assert conn != null; - assert !conn.isClosed(); + try (Connection conn0 = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + conn = conn0; - conn.close(); + assert conn != null; + assert !conn.isClosed(); + } assert conn.isClosed(); assert !conn.isValid(2): "Connection must be closed"; - GridTestUtils.assertThrows( - log, - new Callable<Object>() { - @Override public Object call() throws Exception { - conn.isValid(-2); - - return null; - } - }, - SQLException.class, - "Invalid timeout" - ); + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + conn.isValid(-2); + + return null; + } + }, SQLException.class, "Invalid timeout"); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java index 3f762fc..7404ebd 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java @@ -254,6 +254,8 @@ public abstract class JdbcThinDynamicIndexAbstractSelfTest extends JdbcThinAbstr * @throws SQLException If failed. */ public void testIndexState() throws SQLException { + fail("https://issues.apache.org/jira/browse/IGNITE-5373"); + IgniteCache<String, Person> cache = cache(); assertSize(3); http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinNoDefaultSchemaTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinNoDefaultSchemaTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinNoDefaultSchemaTest.java index 97008c8..cea176e 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinNoDefaultSchemaTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinNoDefaultSchemaTest.java @@ -44,7 +44,7 @@ public class JdbcThinNoDefaultSchemaTest extends JdbcThinAbstractSelfTest { private static final String CACHE2_NAME = "cache2"; /** URL. */ - private static final String URL = "jdbc:ignite:thin://127.0.0.1/"; + private static final String URL = "jdbc:ignite:thin://127.0.0.1"; /** Grid count. */ private static final int GRID_CNT = 2; @@ -107,15 +107,14 @@ public class JdbcThinNoDefaultSchemaTest extends JdbcThinAbstractSelfTest { /** * @throws Exception If failed. */ + @SuppressWarnings({"EmptyTryBlock", "unused"}) public void testDefaults() throws Exception { - String url = URL; - - try (Connection conn = DriverManager.getConnection(url)) { - assertNotNull(conn); + try (Connection conn = DriverManager.getConnection(URL)) { + // No-op. } - try (Connection conn = DriverManager.getConnection(url + '/')) { - assertNotNull(conn); + try (Connection conn = DriverManager.getConnection(URL + '/')) { + // No-op. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java index 5c04701..665ac69 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java @@ -28,10 +28,11 @@ import java.util.List; import java.util.Properties; import java.util.logging.Logger; import org.apache.ignite.cache.affinity.AffinityKey; -import org.apache.ignite.configuration.OdbcConfiguration; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.jdbc.JdbcDriverPropertyInfo; import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection; +import org.apache.ignite.internal.jdbc.thin.JdbcThinUtils; +import org.apache.ignite.internal.util.typedef.F; /** * JDBC driver thin implementation for In-Memory Data Grid. @@ -132,32 +133,6 @@ import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection; */ @SuppressWarnings("JavadocReference") public class IgniteJdbcThinDriver implements Driver { - /** Prefix for property names. */ - private static final String PROP_PREFIX = "ignite.jdbc"; - - /** Distributed joins parameter name. */ - private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins"; - - /** Enforce join order parameter name. */ - private static final String ENFORCE_JOIN_ORDER = "enforceJoinOrder"; - - /** Hostname property name. */ - public static final String PROP_HOST = PROP_PREFIX + "host"; - - /** Port number property name. */ - public static final String PROP_PORT = PROP_PREFIX + "port"; - - /** Distributed joins property name. */ - public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS; - - /** Transactions allowed property name. */ - public static final String PROP_ENFORCE_JOIN_ORDER = PROP_PREFIX + ENFORCE_JOIN_ORDER; - - /** URL prefix. */ - public static final String URL_PREFIX = "jdbc:ignite:thin://"; - - /** Default port. */ - public static final int DFLT_PORT = OdbcConfiguration.DFLT_TCP_PORT_FROM; /** Major version. */ private static final int MAJOR_VER = IgniteVersionUtils.VER.major(); @@ -182,27 +157,25 @@ public class IgniteJdbcThinDriver implements Driver { if (!acceptsURL(url)) return null; - if (!parseUrl(url, props)) - throw new SQLException("URL is invalid: " + url); + parseUrl(url, props); return new JdbcThinConnection(url, props); } /** {@inheritDoc} */ @Override public boolean acceptsURL(String url) throws SQLException { - return url.startsWith(URL_PREFIX); + return url.startsWith(JdbcThinUtils.URL_PREFIX); } /** {@inheritDoc} */ @Override public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { - if (!parseUrl(url, info)) - throw new SQLException("URL is invalid: " + url); + parseUrl(url, info); List<DriverPropertyInfo> props = Arrays.<DriverPropertyInfo>asList( - new JdbcDriverPropertyInfo("Hostname", info.getProperty(PROP_HOST), ""), - new JdbcDriverPropertyInfo("Port number", info.getProperty(PROP_PORT), ""), - new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(PROP_DISTRIBUTED_JOINS), ""), - new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(PROP_ENFORCE_JOIN_ORDER), "") + new JdbcDriverPropertyInfo("Hostname", info.getProperty(JdbcThinUtils.PROP_HOST), ""), + new JdbcDriverPropertyInfo("Port number", info.getProperty(JdbcThinUtils.PROP_PORT), ""), + new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(JdbcThinUtils.PROP_DISTRIBUTED_JOINS), ""), + new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER), "") ); return props.toArray(new DriverPropertyInfo[0]); @@ -233,83 +206,65 @@ public class IgniteJdbcThinDriver implements Driver { * * @param props Properties. * @param url URL. - * @return Whether URL is valid. - */ - private boolean parseUrl(String url, Properties props) { - if (url == null) - return false; - - if (url.startsWith(URL_PREFIX) && url.length() > URL_PREFIX.length()) - return parseJdbcUrl(url, props); - - return false; - } - - /** - * @param url Url. - * @param props Properties. - * @return Whether URL is valid. */ - private boolean parseJdbcUrl(String url, Properties props) { - url = url.substring(URL_PREFIX.length()); - - String[] parts = url.split("\\?"); - - if (parts.length > 2) - return false; + private void parseUrl(String url, Properties props) throws SQLException { + if (F.isEmpty(url)) + throw new SQLException("URL cannot be null or empty."); - if (parts.length == 2) - if (!parseParameters(parts[1], "&", props)) - return false; + if (!url.startsWith(JdbcThinUtils.URL_PREFIX)) + throw new SQLException("URL must start with \"" + JdbcThinUtils.URL_PREFIX + "\""); - parts = parts[0].split("/"); + String nakedUrl = url.substring(JdbcThinUtils.URL_PREFIX.length()).trim(); - assert parts.length > 0; + String[] nakedUrlParts = nakedUrl.split("\\?"); - if (parts.length > 1) - return false; + if (nakedUrlParts.length > 2) + throw new SQLException("Invalid URL format (only one ? character is allowed): " + url); - url = parts[0]; + String endpoint = nakedUrlParts[0]; - parts = url.split(":"); + if (endpoint.endsWith("/")) + endpoint = endpoint.substring(0, endpoint.length() - 1); - assert parts.length > 0; + String[] endpointParts = endpoint.split(":"); - if (parts.length > 2) - return false; + if (endpointParts.length > 2) + throw new SQLException("Invalid endpoint format (should be \"host[:port]\"): " + endpoint); - props.setProperty(PROP_HOST, parts[0]); + props.setProperty(JdbcThinUtils.PROP_HOST, endpointParts[0]); - try { - props.setProperty(PROP_PORT, String.valueOf(parts.length == 2 ? Integer.valueOf(parts[1]) : DFLT_PORT)); - } - catch (NumberFormatException ignored) { - return false; - } + if (endpointParts.length == 2) + props.setProperty(JdbcThinUtils.PROP_PORT, endpointParts[1]); - return true; + if (nakedUrlParts.length == 2) + parseParameters(nakedUrlParts[1], props); } /** * Validates and parses URL parameters. * - * @param val Parameters string. + * @param str Parameters string. * @param delim Delimiter. * @param props Properties. - * @return Whether URL parameters string is valid. + * @throws SQLException If failed. */ - private boolean parseParameters(String val, String delim, Properties props) { - String[] params = val.split(delim); + private void parseParameters(String str, Properties props) throws SQLException { + String[] params = str.split("&"); for (String param : params) { String[] pair = param.split("="); - if (pair.length != 2 || pair[0].isEmpty() || pair[1].isEmpty()) - return false; + if (pair.length != 2) + throw new SQLException("Invalid parameter format (only one = character is allowed per key/value " + + "pair: " + param); - props.setProperty(PROP_PREFIX + pair[0], pair[1]); - } + String key = pair[0].trim(); + String val = pair[1].trim(); + + if (key.isEmpty() || val.isEmpty()) + throw new SQLException("Invalid parameter format (key and value cannot be empty): " + param); - return true; + props.setProperty(JdbcThinUtils.PROP_PREFIX + key, val); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index 4ba7557..b284f62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.jdbc.thin; +import org.apache.ignite.internal.util.typedef.F; + import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -41,10 +43,14 @@ import java.util.logging.Logger; import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; import static java.sql.ResultSet.TYPE_FORWARD_ONLY; -import static org.apache.ignite.IgniteJdbcThinDriver.PROP_DISTRIBUTED_JOINS; -import static org.apache.ignite.IgniteJdbcThinDriver.PROP_ENFORCE_JOIN_ORDER; -import static org.apache.ignite.IgniteJdbcThinDriver.PROP_HOST; -import static org.apache.ignite.IgniteJdbcThinDriver.PROP_PORT; + +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_HOST; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_PORT; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_DISTRIBUTED_JOINS; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SOCK_SND_BUF; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SOCK_RCV_BUF; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_TCP_NO_DELAY; /** * JDBC connection implementation. @@ -91,36 +97,27 @@ public class JdbcThinConnection implements Connection { autoCommit = true; txIsolation = Connection.TRANSACTION_NONE; - boolean distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS, "false")); - boolean enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER, "false")); + String host = extractHost(props); + int port = extractPort(props); - String host = props.getProperty(PROP_HOST); - String portStr = props.getProperty(PROP_PORT); + boolean distributedJoins = extractBoolean(props, PROP_DISTRIBUTED_JOINS, false); + boolean enforceJoinOrder = extractBoolean(props, PROP_ENFORCE_JOIN_ORDER, false); - try { - int port = Integer.parseInt(portStr); + int sockSndBuf = extractIntNonNegative(props, PROP_SOCK_SND_BUF, 0); + int sockRcvBuf = extractIntNonNegative(props, PROP_SOCK_RCV_BUF, 0); - if (port <= 0 || port > 0xFFFF) - throw new SQLException("Invalid port: " + portStr); - } - catch (NumberFormatException e) { - throw new SQLException("Invalid port: " + portStr, e); - } - - if (host == null || host.trim().isEmpty()) - throw new SQLException("Host name is empty."); - - String endpoint = host.trim() + ":" + portStr.trim(); + boolean tcpNoDelay = extractBoolean(props, PROP_TCP_NO_DELAY, true); try { - cliIo = new JdbcThinTcpIo(endpoint, distributedJoins, enforceJoinOrder); + cliIo = new JdbcThinTcpIo(host, port, distributedJoins, enforceJoinOrder, + sockSndBuf, sockRcvBuf, tcpNoDelay); cliIo.start(); } catch (Exception e) { cliIo.close(); - throw new SQLException("Failed to connect to Ignite cluster [host=" + host + ", port=" + portStr + ']', e); + throw new SQLException("Failed to connect to Ignite node [host=" + host + ", port=" + port + ']', e); } } @@ -486,6 +483,7 @@ public class JdbcThinConnection implements Connection { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> iface) throws SQLException { if (!isWrapperFor(iface)) throw new SQLException("Connection is not a wrapper for " + iface.getName()); @@ -539,7 +537,121 @@ public class JdbcThinConnection implements Connection { /** * @return Ignite endpoint and I/O protocol. */ - JdbcThinTcpIo cliIo() { + public JdbcThinTcpIo io() { return cliIo; } + + /** + * Extract host. + * + * @param props Properties. + * @return Host. + * @throws SQLException If failed. + */ + private static String extractHost(Properties props) throws SQLException { + String host = props.getProperty(PROP_HOST); + + if (host != null) + host = host.trim(); + + if (F.isEmpty(host)) + throw new SQLException("Host name is empty."); + + return host; + } + + /** + * Extract port. + * + * @param props Properties. + * @return Port. + * @throws SQLException If failed. + */ + private static int extractPort(Properties props) throws SQLException { + String portStr = props.getProperty(PROP_PORT); + + if (portStr == null) + return JdbcThinUtils.DFLT_PORT; + + int port; + + try { + port = Integer.parseInt(portStr); + + if (port <= 0 || port > 0xFFFF) + throw new SQLException("Invalid port: " + portStr); + } + catch (NumberFormatException e) { + throw new SQLException("Invalid port: " + portStr, e); + } + + return port; + } + + /** + * Extract boolean property. + * + * @param props Properties. + * @param propName Property name. + * @param dfltVal Default value. + * @return Value. + * @throws SQLException If failed. + */ + private static boolean extractBoolean(Properties props, String propName, boolean dfltVal) throws SQLException { + String strVal = props.getProperty(propName); + + if (strVal == null) + return dfltVal; + + if (Boolean.TRUE.toString().equalsIgnoreCase(strVal)) + return true; + else if (Boolean.FALSE.toString().equalsIgnoreCase(strVal)) + return false; + else + throw new SQLException("Failed to parse boolean property [name=" + JdbcThinUtils.trimPrefix(propName) + + ", value=" + strVal + ']'); + } + + /** + * Extract non-negative int property. + * + * @param props Properties. + * @param propName Property name. + * @param dfltVal Default value. + * @return Value. + * @throws SQLException If failed. + */ + private static int extractIntNonNegative(Properties props, String propName, int dfltVal) throws SQLException { + int res = extractInt(props, propName, dfltVal); + + if (res < 0) + throw new SQLException("Property cannot be negative [name=" + JdbcThinUtils.trimPrefix(propName) + + ", value=" + res + ']'); + + return res; + } + + /** + * Extract int property. + * + * @param props Properties. + * @param propName Property name. + * @param dfltVal Default value. + * @return Value. + * @throws SQLException If failed. + */ + private static int extractInt(Properties props, String propName, int dfltVal) throws SQLException { + String strVal = props.getProperty(propName); + + if (strVal == null) + return dfltVal; + + try { + return Integer.parseInt(strVal); + } + catch (NumberFormatException e) { + throw new SQLException("Failed to parse int property [name=" + JdbcThinUtils.trimPrefix(propName) + + ", value=" + strVal + ']'); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java index 36f938b..87bc526 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java @@ -136,7 +136,7 @@ public class JdbcThinResultSet implements ResultSet { if (rowsIter == null && !finished) { try { - JdbcQueryFetchResult res = stmt.connection().cliIo().queryFetch(qryId, fetchSize); + JdbcQueryFetchResult res = stmt.connection().io().queryFetch(qryId, fetchSize); rows = res.items(); finished = res.last(); @@ -178,7 +178,7 @@ public class JdbcThinResultSet implements ResultSet { return; try { - stmt.connection().cliIo().queryClose(qryId); + stmt.connection().io().queryClose(qryId); closed = true; } @@ -1618,7 +1618,7 @@ public class JdbcThinResultSet implements ResultSet { private List<JdbcColumnMeta> meta() throws SQLException { if (!metaInit) { try { - JdbcQueryMetadataResult res = stmt.connection().cliIo().queryMeta(qryId); + JdbcQueryMetadataResult res = stmt.connection().io().queryMeta(qryId); meta = res.meta(); http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index 91b8b06..a0b7ee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -106,7 +106,7 @@ public class JdbcThinStatement implements Statement { throw new SQLException("SQL query is empty."); try { - JdbcQueryExecuteResult res = conn.cliIo().queryExecute(conn.getSchema(), pageSize, maxRows, + JdbcQueryExecuteResult res = conn.io().queryExecute(conn.getSchema(), pageSize, maxRows, sql, args); assert res != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 136fcf9..3f6edb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -20,8 +20,9 @@ package org.apache.ignite.internal.jdbc.thin; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; import java.util.List; -import java.util.logging.Logger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; @@ -41,9 +42,8 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult; -import org.apache.ignite.internal.processors.odbc.jdbc.JdbcUtils; import org.apache.ignite.internal.util.ipc.IpcEndpoint; -import org.apache.ignite.internal.util.ipc.IpcEndpointFactory; +import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -68,14 +68,29 @@ public class JdbcThinTcpIo { /** Initial output for query close message. */ private static final int QUERY_CLOSE_MSG_SIZE = 9; - /** Logger. */ - private static final Logger log = Logger.getLogger(JdbcThinTcpIo.class.getName()); + /** Host. */ + private final String host; - /** Server endpoint address. */ - private final String endpointAddr; + /** Port. */ + private final int port; + + /** Distributed joins. */ + private final boolean distributedJoins; + + /** Enforce join order. */ + private final boolean enforceJoinOrder; + + /** Socket send buffer. */ + private final int sockSndBuf; + + /** Socket receive buffer. */ + private final int sockRcvBuf; + + /** TCP no delay flag. */ + private final boolean tcpNoDelay; /** Endpoint. */ - private IpcEndpoint endpoint; + private IpcClientTcpEndpoint endpoint; /** Output stream. */ private BufferedOutputStream out; @@ -83,26 +98,29 @@ public class JdbcThinTcpIo { /** Input stream. */ private BufferedInputStream in; - /** Distributed joins. */ - private boolean distributedJoins; - - /** Enforce join order. */ - private boolean enforceJoinOrder; - /** Closed flag. */ private boolean closed; /** - * @param endpointAddr Endpoint. + * Constructor. + * + * @param host Host. + * @param port Port. * @param distributedJoins Distributed joins flag. * @param enforceJoinOrder Enforce join order flag. + * @param sockSndBuf Socket send buffer. + * @param sockRcvBuf Socket receive buffer. + * @param tcpNoDelay TCP no delay flag. */ - JdbcThinTcpIo(String endpointAddr, boolean distributedJoins, boolean enforceJoinOrder) { - assert endpointAddr != null; - - this.endpointAddr = endpointAddr; + JdbcThinTcpIo(String host, int port, boolean distributedJoins, boolean enforceJoinOrder, int sockSndBuf, + int sockRcvBuf, boolean tcpNoDelay) { + this.host = host; + this.port = port; this.distributedJoins = distributedJoins; - this.enforceJoinOrder= enforceJoinOrder; + this.enforceJoinOrder = enforceJoinOrder; + this.sockSndBuf = sockSndBuf; + this.sockRcvBuf = sockRcvBuf; + this.tcpNoDelay = tcpNoDelay; } /** @@ -110,7 +128,24 @@ public class JdbcThinTcpIo { * @throws IOException On IO error in handshake. */ public void start() throws IgniteCheckedException, IOException { - endpoint = IpcEndpointFactory.connectEndpoint(endpointAddr, null); + Socket sock = new Socket(); + + if (sockSndBuf != 0) + sock.setSendBufferSize(sockSndBuf); + + if (sockRcvBuf != 0) + sock.setReceiveBufferSize(sockRcvBuf); + + sock.setTcpNoDelay(tcpNoDelay); + + try { + sock.connect(new InetSocketAddress(host, port)); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to connect to server [host=" + host + ", port=" + port + ']', e); + } + + endpoint = new IpcClientTcpEndpoint(sock); out = new BufferedOutputStream(endpoint.outputStream()); in = new BufferedInputStream(endpoint.inputStream()); @@ -306,4 +341,25 @@ public class JdbcThinTcpIo { closed = true; } + + /** + * @return Socket send buffer size. + */ + public int socketSendBuffer() { + return sockSndBuf; + } + + /** + * @return Socket receive buffer size. + */ + public int socketReceiveBuffer() { + return sockRcvBuf; + } + + /** + * @return TCP no delay flag. + */ + public boolean tcpNoDelay() { + return tcpNoDelay; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/898781aa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java index 7f4c111..a346ea8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.jdbc.thin; +import org.apache.ignite.configuration.OdbcConfiguration; + import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; @@ -40,6 +42,61 @@ import static java.sql.Types.VARCHAR; * Utility methods for thin JDBC driver. */ public class JdbcThinUtils { + /** URL prefix. */ + public static final String URL_PREFIX = "jdbc:ignite:thin://"; + + /** Prefix for property names. */ + public static final String PROP_PREFIX = "ignite.jdbc"; + + /** Port number property name. */ + public static final String PROP_PORT = PROP_PREFIX + "port"; + + /** Hostname property name. */ + public static final String PROP_HOST = PROP_PREFIX + "host"; + + /** Parameter: distributed joins flag. */ + public static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins"; + + /** Parameter: enforce join order flag. */ + public static final String PARAM_ENFORCE_JOIN_ORDER = "enforceJoinOrder"; + + /** Parameter: socket send buffer. */ + public static final String PARAM_SOCK_SND_BUF = "socketSendBuffer"; + + /** Parameter: socket receive buffer. */ + public static final String PARAM_SOCK_RCV_BUF = "socketReceiveBuffer"; + + /** Parameter: TCP no-delay flag. */ + public static final String PARAM_TCP_NO_DELAY = "tcpNoDelay"; + + /** Distributed joins property name. */ + public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS; + + /** Transactions allowed property name. */ + public static final String PROP_ENFORCE_JOIN_ORDER = PROP_PREFIX + PARAM_ENFORCE_JOIN_ORDER; + + /** Socket send buffer property name. */ + public static final String PROP_SOCK_SND_BUF = PROP_PREFIX + PARAM_SOCK_SND_BUF; + + /** Socket receive buffer property name. */ + public static final String PROP_SOCK_RCV_BUF = PROP_PREFIX + PARAM_SOCK_RCV_BUF; + + /** TCP no delay property name. */ + public static final String PROP_TCP_NO_DELAY = PROP_PREFIX + PARAM_TCP_NO_DELAY; + + /** Default port. */ + public static final int DFLT_PORT = OdbcConfiguration.DFLT_TCP_PORT_FROM; + + /** + * Trim prefix from property. + * + * @param prop Property. + * @return Parameter name. + */ + public static String trimPrefix(String prop) { + return prop.substring(PROP_PREFIX.length()); + } + /** * Converts Java class name to type from {@link Types}. * @@ -126,7 +183,6 @@ public class JdbcThinUtils { && type != Types.NVARCHAR && type != Types.LONGNVARCHAR && type != Types.REF - && type != Types.NCHAR && type != Types.ROWID && type != Types.SQLXML; }
