IGNITE-5376: JDBC thin: added "collocated" and "replicatedOnly" flags.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9733921 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9733921 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9733921 Branch: refs/heads/ignite-5293 Commit: a9733921f2203075738448806ce9713037361cb3 Parents: f060582 Author: devozerov <[email protected]> Authored: Mon Jun 5 22:46:49 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Jun 5 22:46:49 2017 +0300 ---------------------------------------------------------------------- .../jdbc/thin/JdbcThinConnectionSelfTest.java | 84 +++++++++++++++++--- .../org/apache/ignite/IgniteJdbcThinDriver.java | 4 +- .../internal/jdbc/thin/JdbcThinConnection.java | 8 +- .../internal/jdbc/thin/JdbcThinTcpIo.java | 45 ++++++++++- .../internal/jdbc/thin/JdbcThinUtils.java | 12 +++ .../processors/odbc/SqlListenerNioListener.java | 38 ++++----- .../odbc/jdbc/JdbcRequestHandler.java | 14 +++- 7 files changed, 166 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index 8407a15..7ea22d5 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -125,25 +125,75 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { "Property cannot be negative [name=" + JdbcThinUtils.PARAM_SOCK_RCV_BUF); try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { - assertEquals(0, socket(conn).socketSendBuffer()); - assertEquals(0, socket(conn).socketReceiveBuffer()); + assertEquals(0, io(conn).socketSendBuffer()); + assertEquals(0, io(conn).socketReceiveBuffer()); } // Note that SO_* options are hints, so we check that value is equals to either what we set or to default. try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?socketSendBuffer=1024")) { - assertEquals(1024, socket(conn).socketSendBuffer()); - assertEquals(0, socket(conn).socketReceiveBuffer()); + assertEquals(1024, io(conn).socketSendBuffer()); + assertEquals(0, io(conn).socketReceiveBuffer()); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?socketReceiveBuffer=1024")) { - assertEquals(0, socket(conn).socketSendBuffer()); - assertEquals(1024, socket(conn).socketReceiveBuffer()); + assertEquals(0, io(conn).socketSendBuffer()); + assertEquals(1024, io(conn).socketReceiveBuffer()); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?" + "socketSendBuffer=1024&socketReceiveBuffer=2048")) { - assertEquals(1024, socket(conn).socketSendBuffer()); - assertEquals(2048, socket(conn).socketReceiveBuffer()); + assertEquals(1024, io(conn).socketSendBuffer()); + assertEquals(2048, io(conn).socketReceiveBuffer()); + } + } + + /** + * Test SQL hints. + * + * @throws Exception If failed. + */ + public void testSqlHints() throws Exception { + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + assertFalse(io(conn).distributedJoins()); + assertFalse(io(conn).enforceJoinOrder()); + assertFalse(io(conn).collocated()); + assertFalse(io(conn).replicatedOnly()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true")) { + assertTrue(io(conn).distributedJoins()); + assertFalse(io(conn).enforceJoinOrder()); + assertFalse(io(conn).collocated()); + assertFalse(io(conn).replicatedOnly()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?enforceJoinOrder=true")) { + assertFalse(io(conn).distributedJoins()); + assertTrue(io(conn).enforceJoinOrder()); + assertFalse(io(conn).collocated()); + assertFalse(io(conn).replicatedOnly()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?collocated=true")) { + assertFalse(io(conn).distributedJoins()); + assertFalse(io(conn).enforceJoinOrder()); + assertTrue(io(conn).collocated()); + assertFalse(io(conn).replicatedOnly()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?replicatedOnly=true")) { + assertFalse(io(conn).distributedJoins()); + assertFalse(io(conn).enforceJoinOrder()); + assertFalse(io(conn).collocated()); + assertTrue(io(conn).replicatedOnly()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?distributedJoins=true&" + + "enforceJoinOrder=true&collocated=true&replicatedOnly=true")) { + assertTrue(io(conn).distributedJoins()); + assertTrue(io(conn).enforceJoinOrder()); + assertTrue(io(conn).collocated()); + assertTrue(io(conn).replicatedOnly()); } } @@ -152,7 +202,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { * * @throws Exception If failed. */ - public void testPropertyTcpNoDelay() throws Exception { + public void testTcpNoDelay() throws Exception { assertInvalid("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=0", "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_TCP_NO_DELAY); @@ -166,15 +216,23 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_TCP_NO_DELAY); try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { - assertTrue(socket(conn).tcpNoDelay()); + assertTrue(io(conn).tcpNoDelay()); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=true")) { - assertTrue(socket(conn).tcpNoDelay()); + assertTrue(io(conn).tcpNoDelay()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=True")) { + assertTrue(io(conn).tcpNoDelay()); } try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=false")) { - assertFalse(socket(conn).tcpNoDelay()); + assertFalse(io(conn).tcpNoDelay()); + } + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?tcpNoDelay=False")) { + assertFalse(io(conn).tcpNoDelay()); } } @@ -185,7 +243,7 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { * @return Socket. * @throws Exception If failed. */ - private static JdbcThinTcpIo socket(Connection conn) throws Exception { + private static JdbcThinTcpIo io(Connection conn) throws Exception { JdbcThinConnection conn0 = conn.unwrap(JdbcThinConnection.class); return conn0.io(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java index 7766d98..1392d62 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java @@ -184,7 +184,9 @@ public class IgniteJdbcThinDriver implements Driver { new JdbcDriverPropertyInfo("Hostname", info.getProperty(JdbcThinUtils.PROP_HOST), ""), new JdbcDriverPropertyInfo("Port number", info.getProperty(JdbcThinUtils.PROP_PORT), ""), new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(JdbcThinUtils.PROP_DISTRIBUTED_JOINS), ""), - new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER), "") + new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER), ""), + new JdbcDriverPropertyInfo("Collocated", info.getProperty(JdbcThinUtils.PROP_COLLOCATED), ""), + new JdbcDriverPropertyInfo("Replicated only", info.getProperty(JdbcThinUtils.PROP_REPLICATED_ONLY), "") ); return props.toArray(new DriverPropertyInfo[0]); http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index b284f62..b372085 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -48,6 +48,8 @@ import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_HOST; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_PORT; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_DISTRIBUTED_JOINS; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_ENFORCE_JOIN_ORDER; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_COLLOCATED; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_REPLICATED_ONLY; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SOCK_SND_BUF; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_SOCK_RCV_BUF; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_TCP_NO_DELAY; @@ -102,6 +104,8 @@ public class JdbcThinConnection implements Connection { boolean distributedJoins = extractBoolean(props, PROP_DISTRIBUTED_JOINS, false); boolean enforceJoinOrder = extractBoolean(props, PROP_ENFORCE_JOIN_ORDER, false); + boolean collocated = extractBoolean(props, PROP_COLLOCATED, false); + boolean replicatedOnly = extractBoolean(props, PROP_REPLICATED_ONLY, false); int sockSndBuf = extractIntNonNegative(props, PROP_SOCK_SND_BUF, 0); int sockRcvBuf = extractIntNonNegative(props, PROP_SOCK_RCV_BUF, 0); @@ -109,7 +113,7 @@ public class JdbcThinConnection implements Connection { boolean tcpNoDelay = extractBoolean(props, PROP_TCP_NO_DELAY, true); try { - cliIo = new JdbcThinTcpIo(host, port, distributedJoins, enforceJoinOrder, + cliIo = new JdbcThinTcpIo(host, port, distributedJoins, enforceJoinOrder, collocated, replicatedOnly, sockSndBuf, sockRcvBuf, tcpNoDelay); cliIo.start(); @@ -654,4 +658,4 @@ public class JdbcThinConnection implements Connection { ", value=" + strVal + ']'); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 3f6edb8..bf8234d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult; -import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint; import org.apache.ignite.internal.util.typedef.internal.U; @@ -80,6 +79,12 @@ public class JdbcThinTcpIo { /** Enforce join order. */ private final boolean enforceJoinOrder; + /** Collocated flag. */ + private final boolean collocated; + + /** Replicated only flag. */ + private final boolean replicatedOnly; + /** Socket send buffer. */ private final int sockSndBuf; @@ -108,16 +113,20 @@ public class JdbcThinTcpIo { * @param port Port. * @param distributedJoins Distributed joins flag. * @param enforceJoinOrder Enforce join order flag. + * @param collocated Collocated flag. + * @param replicatedOnly Replicated only flag. * @param sockSndBuf Socket send buffer. * @param sockRcvBuf Socket receive buffer. * @param tcpNoDelay TCP no delay flag. */ - JdbcThinTcpIo(String host, int port, boolean distributedJoins, boolean enforceJoinOrder, int sockSndBuf, - int sockRcvBuf, boolean tcpNoDelay) { + JdbcThinTcpIo(String host, int port, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, + boolean replicatedOnly, int sockSndBuf, int sockRcvBuf, boolean tcpNoDelay) { this.host = host; this.port = port; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; + this.collocated = collocated; + this.replicatedOnly = replicatedOnly; this.sockSndBuf = sockSndBuf; this.sockRcvBuf = sockRcvBuf; this.tcpNoDelay = tcpNoDelay; @@ -171,6 +180,8 @@ public class JdbcThinTcpIo { writer.writeBoolean(distributedJoins); writer.writeBoolean(enforceJoinOrder); + writer.writeBoolean(collocated); + writer.writeBoolean(replicatedOnly); send(writer.array()); @@ -343,6 +354,34 @@ public class JdbcThinTcpIo { } /** + * @return Distributed joins flag. + */ + public boolean distributedJoins() { + return distributedJoins; + } + + /** + * @return Enforce join order flag. + */ + public boolean enforceJoinOrder() { + return enforceJoinOrder; + } + + /** + * @return Collocated flag. + */ + public boolean collocated() { + return collocated; + } + + /** + * @return Replicated only flag. + */ + public boolean replicatedOnly() { + return replicatedOnly; + } + + /** * @return Socket send buffer size. */ public int socketSendBuffer() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java index 8d7d6b1..aa9b011 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java @@ -61,6 +61,12 @@ public class JdbcThinUtils { /** Parameter: enforce join order flag. */ public static final String PARAM_ENFORCE_JOIN_ORDER = "enforceJoinOrder"; + /** Parameter: collocated flag. */ + public static final String PARAM_COLLOCATED = "collocated"; + + /** Parameter: replicated only flag. */ + public static final String PARAM_REPLICATED_ONLY = "replicatedOnly"; + /** Parameter: socket send buffer. */ public static final String PARAM_SOCK_SND_BUF = "socketSendBuffer"; @@ -76,6 +82,12 @@ public class JdbcThinUtils { /** Transactions allowed property name. */ public static final String PROP_ENFORCE_JOIN_ORDER = PROP_PREFIX + PARAM_ENFORCE_JOIN_ORDER; + /** Collocated property name. */ + public static final String PROP_COLLOCATED = PROP_PREFIX + PARAM_COLLOCATED; + + /** Replicated only property name. */ + public static final String PROP_REPLICATED_ONLY = PROP_PREFIX + PARAM_REPLICATED_ONLY; + /** Socket send buffer property name. */ public static final String PROP_SOCK_SND_BUF = PROP_PREFIX + PARAM_SOCK_SND_BUF; http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java index f8f595f..e00b001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java @@ -236,31 +236,31 @@ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> private SqlListenerConnectionContext prepareContext(SqlListenerProtocolVersion ver, BinaryReaderExImpl reader) { byte clientType = reader.readByte(); - boolean distributedJoins = reader.readBoolean(); - boolean enforceJoinOrder = reader.readBoolean(); + if (clientType == ODBC_CLIENT) { + boolean distributedJoins = reader.readBoolean(); + boolean enforceJoinOrder = reader.readBoolean(); - SqlListenerRequestHandler handler; - SqlListenerMessageParser parser; + SqlListenerRequestHandler handler = new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, + enforceJoinOrder); - switch (clientType) { - case ODBC_CLIENT: - parser = new OdbcMessageParser(ctx); + SqlListenerMessageParser parser = new JdbcMessageParser(ctx); - handler = new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder); - - break; - - case JDBC_CLIENT: - parser = new JdbcMessageParser(ctx); + return new SqlListenerConnectionContext(handler, parser); + } + else if (clientType == JDBC_CLIENT) { + boolean distributedJoins = reader.readBoolean(); + boolean enforceJoinOrder = reader.readBoolean(); + boolean collocated = reader.readBoolean(); + boolean replicatedOnly = reader.readBoolean(); - handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder); + SqlListenerRequestHandler handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, + enforceJoinOrder, collocated, replicatedOnly); - break; + SqlListenerMessageParser parser = new OdbcMessageParser(ctx); - default: - throw new IgniteException("Unknown client type: " + clientType); + return new SqlListenerConnectionContext(handler, parser); } - - return new SqlListenerConnectionContext(handler, parser); + else + throw new IgniteException("Unknown client type: " + clientType); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a9733921/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index e64e9a6..0796cfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -68,6 +68,12 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { /** Enforce join order flag. */ private final boolean enforceJoinOrder; + /** Collocated flag. */ + private final boolean collocated; + + /** Replicated only flag. */ + private final boolean replicatedOnly; + /** * Constructor. * @@ -76,14 +82,18 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { * @param maxCursors Maximum allowed cursors. * @param distributedJoins Distributed joins flag. * @param enforceJoinOrder Enforce join order flag. + * @param collocated Collocated flag. + * @param replicatedOnly Replicated only flag. */ public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, - boolean distributedJoins, boolean enforceJoinOrder) { + boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly) { this.ctx = ctx; this.busyLock = busyLock; this.maxCursors = maxCursors; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; + this.collocated = collocated; + this.replicatedOnly = replicatedOnly; log = ctx.log(getClass()); } @@ -153,6 +163,8 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { qry.setDistributedJoins(distributedJoins); qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setCollocated(collocated); + qry.setReplicatedOnly(replicatedOnly); if (req.pageSize() <= 0) return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
