This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a025c1a IGNITE-13002 Supports user's java object by JDBC thin client
(#7812)
a025c1a is described below
commit a025c1a62750459209e9817997e8717035ff0fc5
Author: korlov42 <[email protected]>
AuthorDate: Mon May 25 16:07:30 2020 +0300
IGNITE-13002 Supports user's java object by JDBC thin client (#7812)
---
.../jdbc/thin/JdbcThinConnectionSelfTest.java | 68 ++-
.../thin/JdbcThinPreparedStatementSelfTest.java | 310 +++++++++--
.../jdbc/thin/JdbcThinResultSetSelfTest.java | 49 +-
.../thin/JdbcThinStreamingAbstractSelfTest.java | 68 +++
.../apache/ignite/jdbc/thin/JdbcThinTcpIoTest.java | 4 +-
.../ignite/internal/binary/BinaryContext.java | 14 +-
.../ignite/internal/binary/BinaryMarshaller.java | 2 +-
.../ignite/internal/binary/BinaryReaderExImpl.java | 6 -
.../internal/binary/GridBinaryMarshaller.java | 3 -
.../client/thin/ClientBinaryMarshaller.java | 2 +-
.../internal/jdbc/thin/ConnectionProperties.java | 16 +
.../jdbc/thin/ConnectionPropertiesImpl.java | 17 +-
.../internal/jdbc/thin/JdbcThinConnection.java | 620 +++++++++++++++++----
.../jdbc/thin/JdbcThinPreparedStatement.java | 17 +-
.../internal/jdbc/thin/JdbcThinResultSet.java | 2 +-
.../internal/jdbc/thin/JdbcThinStatement.java | 2 +-
.../ignite/internal/jdbc/thin/JdbcThinTcpIo.java | 36 +-
.../ignite/internal/jdbc2/JdbcResultSet.java | 2 +-
.../processors/odbc/ClientListenerNioListener.java | 2 +-
.../internal/processors/odbc/SqlListenerUtils.java | 28 +-
.../odbc/jdbc/JdbcBinaryTypeGetRequest.java | 76 +++
.../odbc/jdbc/JdbcBinaryTypeGetResult.java | 107 ++++
.../odbc/jdbc/JdbcBinaryTypeNameGetRequest.java | 93 ++++
.../odbc/jdbc/JdbcBinaryTypeNameGetResult.java | 92 +++
.../odbc/jdbc/JdbcBinaryTypeNamePutRequest.java | 109 ++++
.../odbc/jdbc/JdbcBinaryTypePutRequest.java | 88 +++
.../odbc/jdbc/JdbcConnectionContext.java | 2 +-
.../processors/odbc/jdbc/JdbcMessageParser.java | 12 +-
.../processors/odbc/jdbc/JdbcProtocolContext.java | 28 +-
.../internal/processors/odbc/jdbc/JdbcQuery.java | 5 +-
.../odbc/jdbc/JdbcQueryExecuteRequest.java | 4 +-
.../internal/processors/odbc/jdbc/JdbcRequest.java | 32 ++
.../processors/odbc/jdbc/JdbcRequestHandler.java | 118 ++++
.../internal/processors/odbc/jdbc/JdbcResult.java | 24 +
.../processors/odbc/jdbc/JdbcThinFeature.java | 8 +-
.../odbc/jdbc/JdbcUpdateBinarySchemaResult.java | 89 +++
.../internal/processors/odbc/jdbc/JdbcUtils.java | 24 +-
.../processors/platform/utils/PlatformUtils.java | 2 +-
.../ignite/testframework/junits/IgniteMock.java | 2 +-
39 files changed, 1968 insertions(+), 215 deletions(-)
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index bf7c2e7..65aeddb 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -39,16 +39,23 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
import org.apache.ignite.internal.jdbc.thin.ConnectionProperties;
import org.apache.ignite.internal.jdbc.thin.ConnectionPropertiesImpl;
import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection;
import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.MarshallerContext;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.testframework.GridStringLogger;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
@@ -2188,7 +2195,7 @@ public class JdbcThinConnectionSelfTest extends
JdbcThinAbstractSelfTest {
assertThrows(null, new Callable<Object>() {
@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Override public Object call() throws Exception {
- new JdbcThinTcpIo(connProps, new InetSocketAddress(LOCALHOST,
DFLT_PORT), 0);
+ new JdbcThinTcpIo(connProps, new InetSocketAddress(LOCALHOST,
DFLT_PORT), getBinaryContext(), 0);
return null;
}
@@ -2282,4 +2289,63 @@ public class JdbcThinConnectionSelfTest extends
JdbcThinAbstractSelfTest {
}
};
}
+
+ /**
+ * Returns stub for marshaller context.
+ *
+ * @return Marshaller context.
+ */
+ private MarshallerContext getFakeMarshallerCtx() {
+ return new MarshallerContext() {
+ @Override public boolean registerClassName(byte platformId, int
typeId,
+ String clsName) throws IgniteCheckedException {
+ return false;
+ }
+
+ @Override public boolean registerClassNameLocally(byte platformId,
int typeId,
+ String clsName) throws IgniteCheckedException {
+ return false;
+ }
+
+ @Override public Class getClass(int typeId, ClassLoader ldr)
throws ClassNotFoundException, IgniteCheckedException {
+ return null;
+ }
+
+ @Override public String getClassName(byte platformId,
+ int typeId) throws ClassNotFoundException,
IgniteCheckedException {
+ return null;
+ }
+
+ @Override public boolean isSystemType(String typeName) {
+ return false;
+ }
+
+ @Override public IgnitePredicate<String> classNameFilter() {
+ return null;
+ }
+
+ @Override public JdkMarshaller jdkMarshaller() {
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Returns new binary context.
+ *
+ * @return New binary context.
+ */
+ private BinaryContext getBinaryContext() {
+ BinaryMarshaller marsh = new BinaryMarshaller();
+
+ marsh.setContext(getFakeMarshallerCtx());
+
+ BinaryContext ctx = new
BinaryContext(BinaryNoopMetadataHandler.instance(),
+ new IgniteConfiguration(), new NullLogger());
+
+ ctx.configure(marsh);
+ ctx.registerUserTypesSchema();
+
+ return ctx;
+ }
}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
index 48cdc12..5e1b757 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
@@ -34,14 +34,20 @@ import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
+import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcThinFeature;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.RunnableX;
+import org.junit.Assert;
import org.junit.Test;
import static java.sql.Types.BIGINT;
@@ -59,6 +65,8 @@ import static java.sql.Types.TINYINT;
import static java.sql.Types.VARCHAR;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
/**
* Prepared statement test.
@@ -66,12 +74,12 @@ import static
org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@SuppressWarnings("ThrowableNotThrown")
public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest {
/** URL. */
- private static final String URL = "jdbc:ignite:thin://127.0.0.1/";
+ private static final String URL = "jdbc:ignite:thin://127.0.0.1";
/** SQL query. */
private static final String SQL_PART =
"select id, boolVal, byteVal, shortVal, intVal, longVal, floatVal, " +
- "doubleVal, bigVal, strVal, arrVal, dateVal, timeVal, tsVal " +
+ "doubleVal, bigVal, strVal, arrVal, dateVal, timeVal, tsVal,
objVal " +
"from TestObject ";
/** Connection. */
@@ -124,6 +132,7 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
o.timeVal = new Time(1);
o.tsVal = new Timestamp(1);
o.urlVal = new URL("http://abc.com/");
+ o.objVal = new TestObjectField(100, "AAAA");
cache.put(1, o);
cache.put(2, new TestObject(2));
@@ -131,9 +140,7 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- conn = DriverManager.getConnection(URL);
-
- conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+ conn = createConnection(false);
assert conn != null;
assert !conn.isClosed();
@@ -155,6 +162,40 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
}
/**
+ * Create new JDBC connection to the grid.
+ *
+ * @param keepBinary Whether to keep bin object in binary format.
+ * @return New connection.
+ */
+ private Connection createConnection(boolean keepBinary) throws
SQLException {
+ String url = keepBinary ? URL + "?keepBinary=true" : URL;
+
+ Connection conn = DriverManager.getConnection(url);
+
+ conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+
+ return conn;
+ }
+
+ /**
+ * Create new JDBC connection to the grid.
+ *
+ * @param disabledFeatues Features that should be disabled.
+ * @return New connection.
+ */
+ private Connection createConnection(JdbcThinFeature... disabledFeatues)
throws SQLException {
+ String url = URL + "?disabledFeatures=" +
Arrays.stream(disabledFeatues)
+ .map(JdbcThinFeature::name)
+ .collect(Collectors.joining(","));
+
+ Connection conn = DriverManager.getConnection(url);
+
+ conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+
+ return conn;
+ }
+
+ /**
* @throws Exception If failed.
*/
@Test
@@ -191,6 +232,171 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
}
/**
+ * Ensure binary object's meta is properly synchronized between connections
+ * - start grid
+ * - from one connection create and fill table such one of the
columns was user's object
+ * - from another connection execute query with filter by this object
+ * - verify that result is not empty and returned object is the same
as expected
+ *
+ * @throws SQLException In case of any sql error.
+ */
+ @Test
+ public void testObjectDifferentConnections() throws SQLException {
+ final TestObjectField exp = new TestObjectField(42, "BBBB");
+
+ conn.createStatement().execute("CREATE TABLE test(id INT PRIMARY KEY,
objVal OTHER)");
+
+ stmt = conn.prepareStatement("INSERT INTO test(id, objVal) VALUES (?,
?)");
+
+ stmt.setInt(1, exp.a);
+ stmt.setObject(2, exp);
+
+ stmt.execute();
+
+ try (Connection anotherConn = createConnection(false);
+ PreparedStatement stmt = anotherConn.prepareStatement("SELECT id,
objVal FROM test WHERE id = ?")
+ ) {
+ stmt.setInt(1, exp.a);
+
+ int cnt = 0;
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ if (cnt == 0) {
+ Assert.assertTrue("Result's value type mismatch",
+ rs.getObject("objVal") instanceof TestObjectField);
+
+ Assert.assertEquals("Result's value mismatch", exp,
rs.getObject("objVal", TestObjectField.class));
+ }
+
+ cnt++;
+ }
+
+ Assert.assertEquals("There should be exactly 1 result", 1, cnt);
+ }
+ }
+
+ /**
+ * Ensure custom objects can be retrieved as {@link BinaryObject}
+ * if keepBinary flag is set to {@code true} on connection
+ * - start grid and create and fill table such one of the columns was
user's object
+ * - from another connection with keepBinary flag set to {@code true}
+ * execute query with filter by this object
+ * - verify that result is not empty and returned object is the
{@link BinaryObject}
+ *
+ * @throws SQLException In case of any sql error.
+ */
+ @Test
+ public void testObjectConnectionWithKeepBinaryFlag() throws SQLException {
+ try (Connection anotherConn = createConnection(true)) {
+ stmt = anotherConn.prepareStatement(SQL_PART + " where objVal is
not distinct from ?");
+
+ stmt.setObject(1, new TestObjectField(100, "AAAA"));
+
+ int cnt = 0;
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ if (cnt == 0) {
+ Assert.assertEquals("Result's id mismatch", 1,
rs.getInt("id"));
+
+ Assert.assertTrue(rs.getObject("objVal") instanceof
BinaryObject);
+
+ Assert.assertEquals("Result's value mismatch",
Integer.valueOf(100),
+ rs.getObject("objVal", BinaryObject.class).field("a"));
+ }
+
+ cnt++;
+ }
+
+ Assert.assertEquals("There should be exactly 1 result", 1, cnt);
+ }
+ }
+
+ /**
+ * Ensure custom objects can be retrieved through JdbcThinConnection
+ * - start grid and create and fill table such one of the columns was
user's object
+ * - execute query with filter by this object (use both real object
and null for param value)
+ * - verify that result is not empty and returned object is the same
as expected
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testObject() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where objVal is not distinct
from ?");
+
+ stmt.setObject(1, new TestObjectField(100, "AAAA"));
+
+ int cnt = 0;
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ if (cnt == 0) {
+ Assert.assertEquals("Result's id mismatch", 1,
rs.getInt("id"));
+
+ Assert.assertTrue("Result's value type mismatch",
+ rs.getObject("objVal") instanceof TestObjectField);
+
+ Assert.assertEquals("Result's value mismatch", 100,
+ rs.getObject("objVal", TestObjectField.class).a);
+ }
+
+ cnt++;
+ }
+
+ Assert.assertEquals("There should be exactly 1 result", 1, cnt);
+
+ stmt.setNull(1, Types.JAVA_OBJECT);
+
+ stmt.execute();
+
+ cnt = 0;
+
+ rs = stmt.getResultSet();
+
+ while (rs.next()) {
+ if (cnt == 0) {
+ Assert.assertEquals("Result's id mismatch", 2,
rs.getInt("id"));
+
+ Assert.assertNull("Result's value should be null",
rs.getObject("objVal"));
+ }
+
+ cnt++;
+ }
+
+ Assert.assertEquals("There should be exactly 1 result", 1, cnt);
+ }
+
+ /**
+ * Ensure custom object support could be disabled via disabledFeatures
connection property
+ * - start grid and create and fill table such one of the columns was
user's object
+ * - from another connection with disabledFeatures set to {@link
JdbcThinFeature#CUSTOM_OBJECT}
+ * execute query with filter by this object
+ * - verify that exception is thrown when you try to set custom
object as statement param
+ * @throws SQLException
+ */
+ @Test
+ public void testCustomObjectSupportCanBeDisabled() throws SQLException {
+ try (Connection conn = createConnection(JdbcThinFeature.CUSTOM_OBJECT);
+ PreparedStatement stmt = conn.prepareStatement(SQL_PART + " where
objVal is not distinct from ?")
+ ) {
+ Throwable t = GridTestUtils.assertThrowsWithCause(
+ new RunnableX() {
+ @Override public void runx() throws Exception {
+ stmt.setObject(1, new TestObjectField(100, "AAAA"));
+ }
+ },
+ SQLException.class
+ );
+
+ Assert.assertThat(t.getMessage(), is(containsString("Custom
objects are not supported")));
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
@Test
@@ -943,45 +1149,11 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
}
});
- GridTestUtils.assertThrows(log,
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.setURL(1, new URL("http://test"));
-
- return null;
- }
- },
- SQLException.class, "Parameter type is unsupported");
-
- GridTestUtils.assertThrows(log,
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.setObject(1, new TestObject(0));
-
- return null;
- }
- },
- SQLException.class, "Parameter type is unsupported");
-
- GridTestUtils.assertThrows(log,
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.setObject(1, new TestObject(0), Types.JAVA_OBJECT);
-
- return null;
- }
- },
- SQLException.class, "Parameter type is unsupported");
-
- GridTestUtils.assertThrows(log,
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.setObject(1, new TestObject(0), Types.JAVA_OBJECT, 0);
-
- return null;
- }
- },
- SQLException.class, "Parameter type is unsupported");
+ checkNotSupported(new RunnableX() {
+ @Override public void runx() throws Exception {
+ stmt.setURL(1, new URL("http://test"));
+ }
+ });
}
/**
@@ -1048,6 +1220,10 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
@QuerySqlField
private URL urlVal;
+ /** */
+ @QuerySqlField
+ private TestObjectField objVal;
+
/**
* @param id ID.
*/
@@ -1055,4 +1231,50 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
this.id = id;
}
}
+
+ /**
+ * Dummy object represents object field of {@link TestObject TestObject}.
+ */
+ @SuppressWarnings("PackageVisibleField")
+ private static class TestObjectField implements Serializable {
+ /** */
+ final int a;
+
+ /** */
+ final String b;
+
+ /**
+ * @param a A.
+ * @param b B.
+ */
+ private TestObjectField(int a, String b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestObjectField that = (TestObjectField)o;
+
+ return a == that.a && !(b != null ? !b.equals(that.b) : that.b !=
null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = a;
+
+ res = 31 * res + (b != null ? b.hashCode() : 0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestObjectField.class, this);
+ }
+ }
}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
index f7870e7..9adc6a0 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.junit.Assert;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -61,7 +62,7 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
/** SQL query. */
private static final String SQL =
"select id, boolVal, byteVal, shortVal, intVal, longVal, floatVal, " +
- "doubleVal, bigVal, strVal, arrVal, dateVal, timeVal, tsVal " +
+ "doubleVal, bigVal, strVal, arrVal, dateVal, timeVal, tsVal,
objVal " +
"from TestObject where id = 1";
/** Statement. */
@@ -688,14 +689,30 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
* @throws Exception If failed.
*/
@Test
- public void testObjectNotSupported() throws Exception {
- assertThrowsAnyCause(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.executeQuery("select f1 from TestObject where id = 1");
+ public void testObject() throws Exception {
+ ResultSet rs = stmt.executeQuery(SQL);
- return null;
+ int cnt = 0;
+
+ TestObjectField exp = new TestObjectField(100, "AAAA");
+
+ while (rs.next()) {
+ if (cnt == 0) {
+ Assert.assertEquals("Result by column label mismatch", exp,
rs.getObject("objVal"));
+
+ Assert.assertEquals("Result by column index mismatch", exp,
rs.getObject(15));
+
+ Assert.assertEquals("Result by column index with general cast
mismatch",
+ exp, rs.getObject(15, Object.class));
+
+ Assert.assertEquals("Result by column index with precise cast
mismatch",
+ exp, rs.getObject(15, TestObjectField.class));
}
- }, SQLException.class, "Custom objects are not supported");
+
+ cnt++;
+ }
+
+ Assert.assertEquals("Result count mismatch", 1, cnt);
}
/**
@@ -1610,6 +1627,18 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
checkResultSetClosed(new RunnableX() {
@Override public void runx() throws Exception {
+ rs.getObject("objVal");
+ }
+ });
+
+ checkResultSetClosed(new RunnableX() {
+ @Override public void runx() throws Exception {
+ rs.getObject("objVal", TestObjectField.class);
+ }
+ });
+
+ checkResultSetClosed(new RunnableX() {
+ @Override public void runx() throws Exception {
rs.wasNull();
}
});
@@ -1729,7 +1758,7 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
/** */
@QuerySqlField
- private TestObjectField f1 = new TestObjectField(100, "AAAA");
+ private TestObjectField objVal = new TestObjectField(100, "AAAA");
/** */
@QuerySqlField
@@ -1766,7 +1795,7 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
if (byteVal != null ? !byteVal.equals(that.byteVal) : that.byteVal
!= null) return false;
if (dateVal != null ? !dateVal.equals(that.dateVal) : that.dateVal
!= null) return false;
if (doubleVal != null ? !doubleVal.equals(that.doubleVal) :
that.doubleVal != null) return false;
- if (f1 != null ? !f1.equals(that.f1) : that.f1 != null) return
false;
+ if (objVal != null ? !objVal.equals(that.objVal) : that.objVal !=
null) return false;
if (f2 != null ? !f2.equals(that.f2) : that.f2 != null) return
false;
if (f3 != null ? !f3.equals(that.f3) : that.f3 != null) return
false;
if (floatVal != null ? !floatVal.equals(that.floatVal) :
that.floatVal != null) return false;
@@ -1800,7 +1829,7 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
res = 31 * res + (timeVal != null ? timeVal.hashCode() : 0);
res = 31 * res + (tsVal != null ? tsVal.hashCode() : 0);
res = 31 * res + (urlVal != null ? urlVal.hashCode() : 0);
- res = 31 * res + (f1 != null ? f1.hashCode() : 0);
+ res = 31 * res + (objVal != null ? objVal.hashCode() : 0);
res = 31 * res + (f2 != null ? f2.hashCode() : 0);
res = 31 * res + (f3 != null ? f3.hashCode() : 0);
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index a93b230..5196818 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -19,12 +19,14 @@ package org.apache.ignite.jdbc.thin;
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
@@ -40,6 +42,7 @@ import
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -399,6 +402,58 @@ public abstract class JdbcThinStreamingAbstractSelfTest
extends JdbcStreamingSel
}
/**
+ * Ensure custom object can be serialized in streaming mode
+ * - start grid
+ * - create table such one of the columns was user's object
+ * - enable streaming and fill the table
+ * - disable streaming and query random row such it should be
presented in the table
+ * - verify returned object
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCustomObject() throws Exception {
+ try (Connection conn = createOrdinaryConnection()) {
+ execute(conn, "CREATE TABLE t2(id INT PRIMARY KEY, val OTHER)");
+ }
+
+ try (Connection conn = createStreamedConnection(false, 10000)) {
+ assertStreamingState(true);
+
+ int testInd = 1 + new Random().nextInt(1000);
+
+ try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO
t2 values (?, ?)")) {
+ for (int i = 1; i <= 1000; i++) {
+ stmt.setInt(1, i);
+ stmt.setObject(2, i == testInd ? new Foo(testInd) : null);
+
+ stmt.executeUpdate();
+ }
+ }
+
+ assertCacheEmpty();
+
+ execute(conn, "set streaming 0");
+
+ assertStreamingState(false);
+
+ U.sleep(500);
+
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT val
FROM t2 WHERE id = ?")) {
+ stmt.setInt(1, testInd);
+
+ ResultSet rs = stmt.executeQuery();
+
+ Assert.assertTrue("Result should not be empty", rs.next());
+
+ Foo foo = rs.getObject(1, Foo.class);
+
+ Assert.assertEquals("Stored value not equals the expected
one", testInd, foo.val);
+ }
+ }
+ }
+
+ /**
* @throws SQLException if failed.
*/
@Test
@@ -593,4 +648,17 @@ public abstract class JdbcThinStreamingAbstractSelfTest
extends JdbcStreamingSel
);
}
}
+
+ /**
+ * Dummy class to use as custom object field.
+ */
+ static class Foo {
+ /** */
+ int val;
+
+ /** */
+ public Foo(int val) {
+ this.val = val;
+ }
+ }
}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTcpIoTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTcpIoTest.java
index 49a50be..f521c1f 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTcpIoTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTcpIoTest.java
@@ -44,7 +44,7 @@ public class JdbcThinTcpIoTest extends GridCommonAbstractTest
{
try {
jdbcThinTcpIo = new JdbcThinTcpIo(new ConnectionPropertiesImpl(),
- new InetSocketAddress("127.0.0.1", 10800), 500);
+ new InetSocketAddress("127.0.0.1", 10800), null, 500);
}
finally {
if (jdbcThinTcpIo != null)
@@ -63,7 +63,7 @@ public class JdbcThinTcpIoTest extends GridCommonAbstractTest
{
@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Override public Object call() throws Exception {
new JdbcThinTcpIo(new ConnectionPropertiesImpl(),
- new InetSocketAddress("123.45.67.89", 10800), 500);
+ new InetSocketAddress("123.45.67.89", 10800), null, 500);
return null;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index 825da30..90ab4a8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -398,10 +398,18 @@ public class BinaryContext {
/**
* @param marsh Binary marshaller.
- * @param cfg Configuration.
* @throws BinaryObjectException In case of error.
*/
- public void configure(BinaryMarshaller marsh, IgniteConfiguration cfg)
throws BinaryObjectException {
+ public void configure(BinaryMarshaller marsh) throws BinaryObjectException
{
+ configure(marsh, null);
+ }
+
+ /**
+ * @param marsh Binary marshaller.
+ * @param binaryCfg Binary configuration.
+ * @throws BinaryObjectException In case of error.
+ */
+ public void configure(BinaryMarshaller marsh, BinaryConfiguration
binaryCfg) throws BinaryObjectException {
if (marsh == null)
return;
@@ -409,8 +417,6 @@ public class BinaryContext {
marshCtx = marsh.getContext();
- BinaryConfiguration binaryCfg = cfg.getBinaryConfiguration();
-
if (binaryCfg == null)
binaryCfg = new BinaryConfiguration();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
index bffd35a..ca5f8f0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
@@ -74,7 +74,7 @@ public class BinaryMarshaller extends
AbstractNodeNameAwareMarshaller {
* @param ctx Binary context.
*/
private void setBinaryContext(BinaryContext ctx, IgniteConfiguration cfg) {
- ctx.configure(this, cfg);
+ ctx.configure(this, cfg != null ? cfg.getBinaryConfiguration() : null);
impl = new GridBinaryMarshaller(ctx);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
index dc84328..2f0ff23 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
@@ -1927,9 +1927,6 @@ public class BinaryReaderExImpl implements BinaryReader,
BinaryRawReaderEx, Bina
((BinaryObjectImpl)obj).context(ctx);
- if (!GridBinaryMarshaller.KEEP_BINARIES.get())
- obj = ((BinaryObject)obj).deserialize();
-
break;
case ENUM:
@@ -1945,9 +1942,6 @@ public class BinaryReaderExImpl implements BinaryReader,
BinaryRawReaderEx, Bina
case BINARY_ENUM:
obj = BinaryUtils.doReadBinaryEnum(in, ctx);
- if (!GridBinaryMarshaller.KEEP_BINARIES.get())
- obj = ((BinaryObject)obj).deserialize();
-
break;
case CLASS:
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
index 568a605..331d1ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
@@ -32,9 +32,6 @@ import org.jetbrains.annotations.Nullable;
* Binary objects marshaller.
*/
public class GridBinaryMarshaller {
- /** */
- public static final ThreadLocal<Boolean> KEEP_BINARIES =
ThreadLocal.withInitial(() -> true);
-
/** Binary context in TLS store. */
private static final ThreadLocal<BinaryContextHolder> BINARY_CTX =
ThreadLocal.withInitial(BinaryContextHolder::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinaryMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinaryMarshaller.java
index 99ba66d..65e4a72 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinaryMarshaller.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinaryMarshaller.java
@@ -114,7 +114,7 @@ class ClientBinaryMarshaller {
marsh.setContext(marshCtx);
- ctx.configure(marsh, igniteCfg);
+ ctx.configure(marsh, binCfg);
ctx.registerUserTypesSchema();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
index 8f12042..d8ddfc2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
@@ -542,4 +542,20 @@ public interface ConnectionProperties {
* The string should contain enumeration of feature names, separated
by the comma.
*/
public void disabledFeatures(String features);
+
+ /**
+ * Get keep binary configuration flag.
+ *
+ * @return Keep binary configuration flag.
+ */
+ public boolean isKeepBinary();
+
+ /**
+ * Set to {@code true} to keep binary objects in binary form.
+ *
+ * <p> Defaults is {@code false}.
+ **
+ * @param keepBinary Whether to keep binary objects in binary form.
+ */
+ public void setKeepBinary(boolean keepBinary);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index 155994c..d9000648 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -254,6 +254,10 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
}
});
+ /** Keep binary objects in binary form. */
+ private BooleanProperty keepBinary = new BooleanProperty("keepBinary",
+ "Whether to keep binary objects in binary form.", false, false);
+
/** Properties array. */
private final ConnectionProperty[] propsArray = {
distributedJoins, enforceJoinOrder, collocated, replicatedOnly,
autoCloseServerCursor,
@@ -271,7 +275,8 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
partitionAwarenessPartDistributionsCacheSize,
qryTimeout,
connTimeout,
- disabledFeatures
+ disabledFeatures,
+ keepBinary
};
/** {@inheritDoc} */
@@ -670,6 +675,16 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
disabledFeatures.setValue(features);
}
+ /** {@inheritDoc} */
+ @Override public boolean isKeepBinary() {
+ return keepBinary.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setKeepBinary(boolean keepBinary) {
+ this.keepBinary.setValue(keepBinary);
+ }
+
/**
* @param url URL connection.
* @param props Environment properties.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 5ab9b49..c35ee28 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@@ -51,7 +52,10 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -65,13 +69,32 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.MarshallerPlatformIds;
+import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.jdbc2.JdbcUtils;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
+import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBinaryTypeGetRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBinaryTypeGetResult;
+import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBinaryTypeNameGetRequest;
+import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBinaryTypeNameGetResult;
+import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBinaryTypeNamePutRequest;
+import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBinaryTypePutRequest;
import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcCachePartitionsRequest;
import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcCachePartitionsResult;
@@ -83,8 +106,10 @@ import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultWithIo;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
+import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcUpdateBinarySchemaResult;
import org.apache.ignite.internal.sql.command.SqlCommand;
import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
import
org.apache.ignite.internal.sql.optimizer.affinity.PartitionClientContext;
@@ -92,7 +117,12 @@ import
org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.MarshallerContext;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
import static java.sql.ResultSet.CLOSE_CURSORS_AT_COMMIT;
@@ -103,6 +133,7 @@ import static
org.apache.ignite.internal.processors.odbc.SqlStateCode.CLIENT_CON
import static
org.apache.ignite.internal.processors.odbc.SqlStateCode.CONNECTION_CLOSED;
import static
org.apache.ignite.internal.processors.odbc.SqlStateCode.CONNECTION_FAILURE;
import static
org.apache.ignite.internal.processors.odbc.SqlStateCode.INTERNAL_ERROR;
+import static
org.apache.ignite.marshaller.MarshallerUtils.processSystemClasses;
/**
* JDBC connection implementation.
@@ -218,6 +249,15 @@ public class JdbcThinConnection implements Connection {
/** Connections handler timer. */
private final IgniteProductVersion baseEndpointVer;
+ /** Binary context. */
+ private final BinaryContext ctx;
+
+ /** Binary metadata handler. */
+ private final JdbcBinaryMetadataHandler metaHnd;
+
+ /** Marshaller context. */
+ private final JdbcMarshallerContext marshCtx;
+
/**
* Creates new connection.
*
@@ -227,6 +267,9 @@ public class JdbcThinConnection implements Connection {
public JdbcThinConnection(ConnectionProperties connProps) throws
SQLException {
this.connProps = connProps;
+ metaHnd = new JdbcBinaryMetadataHandler();
+ marshCtx = new JdbcMarshallerContext();
+ ctx = createBinaryCtx(metaHnd, marshCtx);
holdability = HOLD_CURSORS_OVER_COMMIT;
autoCommit = true;
txIsolation = Connection.TRANSACTION_NONE;
@@ -250,6 +293,22 @@ public class JdbcThinConnection implements Connection {
}
}
+ /** Create new binary context. */
+ private BinaryContext createBinaryCtx(JdbcBinaryMetadataHandler metaHnd,
JdbcMarshallerContext marshCtx) {
+ BinaryMarshaller marsh = new BinaryMarshaller();
+ marsh.setContext(marshCtx);
+
+ BinaryConfiguration binCfg = new
BinaryConfiguration().setCompactFooter(true);
+
+ BinaryContext ctx = new BinaryContext(metaHnd, new
IgniteConfiguration(), new NullLogger());
+
+ ctx.configure(marsh, binCfg);
+
+ ctx.registerUserTypesSchema();
+
+ return ctx;
+ }
+
/**
* @throws SQLException On connection error.
*/
@@ -899,99 +958,91 @@ public class JdbcThinConnection implements Connection {
RequestTimeoutTask reqTimeoutTask = null;
- synchronized (mux) {
- if (ownThread != null) {
- throw new SQLException("Concurrent access to JDBC connection
is not allowed"
- + " [ownThread=" + ownThread.getName()
- + ", curThread=" + Thread.currentThread().getName(),
CONNECTION_FAILURE);
- }
+ acquireMutex();
- ownThread = Thread.currentThread();
- }
try {
- try {
- int retryAttemptsLeft = 1;
+ int retryAttemptsLeft = 1;
- Exception lastE = null;
+ Exception lastE = null;
- while (retryAttemptsLeft > 0) {
- JdbcThinTcpIo cliIo = null;
+ while (retryAttemptsLeft > 0) {
+ JdbcThinTcpIo cliIo = null;
- ensureConnected();
+ ensureConnected();
- try {
- cliIo = (stickyIo == null || !stickyIo.connected()) ?
cliIo(calculateNodeIds(req)) : stickyIo;
-
- if (stmt != null && stmt.requestTimeout() !=
NO_TIMEOUT) {
- reqTimeoutTask = new RequestTimeoutTask(
- req instanceof JdbcBulkLoadBatchRequest ?
stmt.currentRequestId() : req.requestId(),
- cliIo,
- stmt.requestTimeout());
-
- qryTimeoutScheduledFut =
maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0,
- REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS);
- }
+ try {
+ cliIo = (stickyIo == null || !stickyIo.connected()) ?
cliIo(calculateNodeIds(req)) : stickyIo;
- JdbcQueryExecuteRequest qryReq = null;
+ if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) {
+ reqTimeoutTask = new RequestTimeoutTask(
+ req instanceof JdbcBulkLoadBatchRequest ?
stmt.currentRequestId() : req.requestId(),
+ cliIo,
+ stmt.requestTimeout());
- if (req instanceof JdbcQueryExecuteRequest)
- qryReq = (JdbcQueryExecuteRequest)req;
+ qryTimeoutScheduledFut =
maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0,
+ REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS);
+ }
- JdbcResponse res = cliIo.sendRequest(req, stmt);
+ JdbcQueryExecuteRequest qryReq = null;
- txIo = res.activeTransaction() ? cliIo : null;
+ if (req instanceof JdbcQueryExecuteRequest)
+ qryReq = (JdbcQueryExecuteRequest)req;
- if (res.status() ==
IgniteQueryErrorCode.QUERY_CANCELED && stmt != null &&
- stmt.requestTimeout() != NO_TIMEOUT &&
reqTimeoutTask != null &&
- reqTimeoutTask.expired.get()) {
+ JdbcResponse res = cliIo.sendRequest(req, stmt);
- throw new
SQLTimeoutException(QueryCancelledException.ERR_MSG,
SqlStateCode.QUERY_CANCELLED,
- IgniteQueryErrorCode.QUERY_CANCELED);
- }
- else if (res.status() !=
ClientListenerResponse.STATUS_SUCCESS)
- throw new SQLException(res.error(),
IgniteQueryErrorCode.codeToSqlState(res.status()),
- res.status());
+ txIo = res.activeTransaction() ? cliIo : null;
- updateAffinityCache(qryReq, res);
+ if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED &&
stmt != null &&
+ stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask
!= null &&
+ reqTimeoutTask.expired.get()) {
- return new JdbcResultWithIo(res.response(), cliIo);
+ throw new
SQLTimeoutException(QueryCancelledException.ERR_MSG,
SqlStateCode.QUERY_CANCELLED,
+ IgniteQueryErrorCode.QUERY_CANCELED);
}
- catch (SQLException e) {
- if (LOG.isLoggable(Level.FINE))
- LOG.log(Level.FINE, "Exception during sending an
sql request.", e);
+ else if (res.status() !=
ClientListenerResponse.STATUS_SUCCESS)
+ throw new SQLException(res.error(),
IgniteQueryErrorCode.codeToSqlState(res.status()),
+ res.status());
- throw e;
- }
- catch (Exception e) {
- if (LOG.isLoggable(Level.FINE))
- LOG.log(Level.FINE, "Exception during sending an
sql request.", e);
+ updateAffinityCache(qryReq, res);
+
+ return new JdbcResultWithIo(res.response(), cliIo);
+ }
+ catch (SQLException e) {
+ if (LOG.isLoggable(Level.FINE))
+ LOG.log(Level.FINE, "Exception during sending an sql
request.", e);
+ throw e;
+ }
+ catch (Exception e) {
+ if (LOG.isLoggable(Level.FINE))
+ LOG.log(Level.FINE, "Exception during sending an sql
request.", e);
+
+ // We reuse the same connection when deals with binary
objects to synchronize the binary schema,
+ // so if any error occurred during synchronization, we
close the underlying IO when handling problem
+ // for the first time and should skip it during next
processing
+ if (cliIo != null && cliIo.connected())
onDisconnect(cliIo);
- if (e instanceof SocketTimeoutException)
- throw new SQLException("Connection timed out.",
CONNECTION_FAILURE, e);
- else {
- if (lastE == null) {
- retryAttemptsLeft =
calculateRetryAttemptsCount(stickyIo, req);
- lastE = e;
- }
- else
- retryAttemptsLeft--;
+ if (e instanceof SocketTimeoutException)
+ throw new SQLException("Connection timed out.",
CONNECTION_FAILURE, e);
+ else {
+ if (lastE == null) {
+ retryAttemptsLeft =
calculateRetryAttemptsCount(stickyIo, req);
+ lastE = e;
}
+ else
+ retryAttemptsLeft--;
}
}
-
- throw new SQLException("Failed to communicate with Ignite
cluster.", CONNECTION_FAILURE, lastE);
- }
- finally {
- if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT &&
reqTimeoutTask != null)
- qryTimeoutScheduledFut.cancel(false);
}
+
+ throw new SQLException("Failed to communicate with Ignite
cluster.", CONNECTION_FAILURE, lastE);
}
finally {
- synchronized (mux) {
- ownThread = null;
- }
+ if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT &&
reqTimeoutTask != null)
+ qryTimeoutScheduledFut.cancel(false);
+
+ releaseMutex();
}
}
@@ -1156,22 +1207,14 @@ public class JdbcThinConnection implements Connection {
* @param stickyIO Sticky ignite endpoint.
* @throws SQLException On any error.
*/
- private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest
req, JdbcThinTcpIo stickyIO)
+ private void sendRequestNotWaitResponse(JdbcRequest req, JdbcThinTcpIo
stickyIO)
throws SQLException {
ensureConnected();
- synchronized (mux) {
- if (ownThread != null) {
- throw new SQLException("Concurrent access to JDBC connection
is not allowed"
- + " [ownThread=" + ownThread.getName()
- + ", curThread=" + Thread.currentThread().getName(),
CONNECTION_FAILURE);
- }
-
- ownThread = Thread.currentThread();
- }
+ acquireMutex();
try {
- stickyIO.sendBatchRequestNoWaitResponse(req);
+ stickyIO.sendRequestNoWaitResponse(req);
}
catch (SQLException e) {
throw e;
@@ -1186,9 +1229,70 @@ public class JdbcThinConnection implements Connection {
CONNECTION_FAILURE, e);
}
finally {
- synchronized (mux) {
- ownThread = null;
+ releaseMutex();
+ }
+ }
+
+ /**
+ * Acquire mutex. Allows subsequent acquire by the same thread.
+ * <p>
+ * How to use:
+ * <pre>
+ * acquireMutex();
+ *
+ * try {
+ * // do some work here
+ * }
+ * finally {
+ * releaseMutex();
+ * }
+ *
+ * </pre>
+ *
+ * @throws SQLException If mutex already acquired by another thread.
+ * @see JdbcThinConnection#releaseMutex()
+ */
+ private void acquireMutex() throws SQLException {
+ synchronized (mux) {
+ Thread curr = Thread.currentThread();
+
+ if (ownThread != null && ownThread != curr) {
+ throw new SQLException("Concurrent access to JDBC connection
is not allowed"
+ + " [ownThread=" + ownThread.getName()
+ + ", curThread=" + curr.getName(), CONNECTION_FAILURE);
}
+
+ ownThread = curr;
+ }
+ }
+
+ /**
+ * Release mutex. Does nothing if nobody own the mutex.
+ * <p>
+ * How to use:
+ * <pre>
+ * acquireMutex();
+ *
+ * try {
+ * // do some work here
+ * }
+ * finally {
+ * releaseMutex();
+ * }
+ *
+ * </pre>
+ *
+ * @throws IllegalStateException If mutex is owned by another thread.
+ * @see JdbcThinConnection#acquireMutex()
+ */
+ private void releaseMutex() {
+ synchronized (mux) {
+ Thread curr = Thread.currentThread();
+
+ if (ownThread != null && ownThread != curr)
+ throw new IllegalStateException("Mutex is owned by another
thread");
+
+ ownThread = null;
}
}
@@ -1397,8 +1501,7 @@ public class JdbcThinConnection implements Connection {
checkError();
}
- /**
- */
+ /** */
void close0() {
if (connCnt.get() > 0) {
try {
@@ -1415,9 +1518,7 @@ public class JdbcThinConnection implements Connection {
asyncRespReaderThread.interrupt();
}
- /**
- *
- */
+ /** */
void readResponses() {
try {
while (true) {
@@ -1441,6 +1542,19 @@ public class JdbcThinConnection implements Connection {
break;
}
}
+ else if (resp.response() instanceof
JdbcBinaryTypeGetResult)
+
metaHnd.handleResult((JdbcBinaryTypeGetResult)resp.response());
+
+ else if (resp.response() instanceof
JdbcBinaryTypeNameGetResult)
+
marshCtx.handleResult((JdbcBinaryTypeNameGetResult)resp.response());
+
+ else if (resp.response() instanceof
JdbcUpdateBinarySchemaResult) {
+ JdbcUpdateBinarySchemaResult binarySchemaRes =
(JdbcUpdateBinarySchemaResult)resp.response();
+
+ if (!marshCtx.handleResult(binarySchemaRes) &&
!metaHnd.handleResult(binarySchemaRes))
+ LOG.log(Level.WARNING, "Neither marshaller context
nor metadata handler" +
+ " wait for update binary schema result (req="
+ binarySchemaRes + ")");
+ }
else if (resp.status() !=
ClientListenerResponse.STATUS_SUCCESS)
err = new SQLException(resp.error(),
IgniteQueryErrorCode.codeToSqlState(resp.status()));
else
@@ -1461,6 +1575,15 @@ public class JdbcThinConnection implements Connection {
}
/**
+ * Whether custom objects are supported or not.
+ *
+ * @return True if custom objects are supported, false otherwise.
+ */
+ boolean isCustomObjectSupported() {
+ return singleIo.isCustomObjectSupported();
+ }
+
+ /**
* @param nodeIds Set of node's UUIDs.
* @return Ignite endpoint to use for request/response transferring.
*/
@@ -1499,9 +1622,10 @@ public class JdbcThinConnection implements Connection {
}
/**
- * Returns random tcpIo, based on random UUID, generated in a custom way
with the help of {@code Random}
- * instead of {@code SecureRandom}. It's valid, cause cryptographically
strong pseudo
- * random number generator is not required in this particular case. {@code
Random} is much faster
+ * Returns random tcpIo, based on random UUID, generated in a custom way
+ * with the help of {@code Random} instead of {@code SecureRandom}. It's
+ * valid, cause cryptographically strong pseudo random number generator is
+ * not required in this particular case. {@code Random} is much faster
* than {@code SecureRandom}.
*
* @return random tcpIo
@@ -1578,8 +1702,7 @@ public class JdbcThinConnection implements Connection {
for (InetAddress addr : addrs) {
for (int port = srv.portFrom(); port <= srv.portTo();
++port) {
try {
- JdbcThinTcpIo cliIo = new JdbcThinTcpIo(connProps,
new InetSocketAddress(addr, port),
- 0);
+ JdbcThinTcpIo cliIo = new JdbcThinTcpIo(connProps,
new InetSocketAddress(addr, port), ctx, 0);
cliIo.timeout(netTimeout);
@@ -1641,15 +1764,18 @@ public class JdbcThinConnection implements Connection {
}
/**
- * Establishes a connection to ignite endpoint, trying all specified hosts
and ports one by one.
+ * Establishes a connection to ignite endpoint, trying all specified hosts
+ * and ports one by one.
+ *
* Stops as soon as all iosArr are established.
*
* @param baseEndpointVer Base endpoint version.
* @return last connected endpoint version.
- * @throws SQLException If failed to connect to at least one ignite
endpoint,
- * or if endpoints versions are less than base endpoint version.
+ * @throws SQLException If failed to connect to at least one ignite
+ * endpoint, or if endpoints versions are less than base endpoint version.
*/
- private IgniteProductVersion
connectInBestEffortAffinityMode(IgniteProductVersion baseEndpointVer) throws
SQLException {
+ private IgniteProductVersion connectInBestEffortAffinityMode(
+ IgniteProductVersion baseEndpointVer) throws SQLException {
List<Exception> exceptions = null;
for (int i = 0; i < connProps.getAddresses().length; i++) {
@@ -1662,7 +1788,7 @@ public class JdbcThinConnection implements Connection {
for (int port = srv.portFrom(); port <= srv.portTo();
++port) {
try {
JdbcThinTcpIo cliIo =
- new JdbcThinTcpIo(connProps, new
InetSocketAddress(addr, port), 0);
+ new JdbcThinTcpIo(connProps, new
InetSocketAddress(addr, port), ctx, 0);
if (!cliIo.isPartitionAwarenessSupported()) {
cliIo.close();
@@ -1766,7 +1892,6 @@ public class JdbcThinConnection implements Connection {
*
* @param stickyIo sticky connection, if any.
* @param req Jdbc request.
- *
* @return retries count.
*/
private int calculateRetryAttemptsCount(JdbcThinTcpIo stickyIo,
JdbcRequest req) {
@@ -1909,7 +2034,7 @@ public class JdbcThinConnection implements Connection {
}
JdbcThinTcpIo cliIo =
- new JdbcThinTcpIo(connProps, new
InetSocketAddress(addr, port), 0);
+ new JdbcThinTcpIo(connProps, new
InetSocketAddress(addr, port), ctx, 0);
if
(!cliIo.isPartitionAwarenessSupported()) {
processDelay(sockAddr);
@@ -2001,4 +2126,303 @@ public class JdbcThinConnection implements Connection {
reconnectionDelaysRemainder.put(sockAddr, delay);
}
}
+
+ /**
+ * JDBC implementation of {@link MarshallerContext}.
+ */
+ private class JdbcMarshallerContext extends BlockingJdbcChannel implements
MarshallerContext {
+ /** Type ID -> class name map. */
+ private final Map<Integer, String> cache = new ConcurrentHashMap<>();
+
+ /** */
+ private final Set<String> sysTypes = new HashSet<>();
+
+ /**
+ * Default constructor.
+ */
+ public JdbcMarshallerContext() {
+ try {
+ processSystemClasses(U.gridClassLoader(), null, sysTypes::add);
+ }
+ catch (IOException e) {
+ throw new IgniteException("Unable to initialize marshaller
context", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName,
+ boolean failIfUnregistered
+ ) throws IgniteCheckedException {
+ assert platformId == MarshallerPlatformIds.JAVA_ID
+ : String.format("Only Java platform is supported
[expPlatformId=%d, actualPlatformId=%d].",
+ MarshallerPlatformIds.JAVA_ID, platformId);
+
+ boolean res = true;
+
+ if (!cache.containsKey(typeId)) {
+ try {
+ JdbcUpdateBinarySchemaResult updateRes = doRequest(
+ new JdbcBinaryTypeNamePutRequest(typeId, platformId,
clsName));
+
+ res = updateRes.success();
+ }
+ catch (ExecutionException | InterruptedException |
ClientException | SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ if (res)
+ cache.put(typeId, clsName);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Deprecated
+ @Override public boolean registerClassName(byte platformId, int typeId,
+ String clsName) throws IgniteCheckedException {
+ return registerClassName(platformId, typeId, clsName, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean registerClassNameLocally(byte platformId, int
typeId, String clsName) {
+ throw new UnsupportedOperationException("registerClassNameLocally
not supported by " + this.getClass().getSimpleName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class getClass(int typeId, ClassLoader ldr)
+ throws ClassNotFoundException, IgniteCheckedException {
+
+ return U.forName(getClassName(MarshallerPlatformIds.JAVA_ID,
typeId), ldr, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getClassName(byte platformId, int typeId)
throws ClassNotFoundException, IgniteCheckedException {
+ assert platformId == MarshallerPlatformIds.JAVA_ID
+ : String.format("Only Java platform is supported
[expPlatformId=%d, actualPlatformId=%d].", MarshallerPlatformIds.JAVA_ID,
platformId);
+
+ String clsName = cache.get(typeId);
+ if (clsName == null) {
+ try {
+ JdbcBinaryTypeNameGetResult res = doRequest(new
JdbcBinaryTypeNameGetRequest(typeId, platformId));
+ clsName = res.typeName();
+ }
+ catch (ExecutionException | InterruptedException |
ClientException | SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ if (clsName == null)
+ throw new ClassNotFoundException(String.format("Unknown type
id [%s]", typeId));
+
+ return clsName;
+ }
+
+ /**
+ * Handle update binary schema result.
+ *
+ * @param res Result.
+ * @return {@code true} if marshaller was waiting for result with
given request ID.
+ */
+ public boolean handleResult(JdbcUpdateBinarySchemaResult res) {
+ return handleResult(res.reqId(), res);
+ }
+
+ /**
+ * Handle binary type name result.
+ *
+ * @param res Result.
+ * @return {@code true} if marshaller was waiting for result with
given request ID.
+ */
+ public boolean handleResult(JdbcBinaryTypeNameGetResult res) {
+ return handleResult(res.reqId(), res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isSystemType(String typeName) {
+ return sysTypes.contains(typeName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgnitePredicate<String> classNameFilter() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public JdkMarshaller jdkMarshaller() {
+ return new JdkMarshaller();
+ }
+ }
+
+ /**
+ * JDBC implementation of {@link BinaryMetadataHandler}.
+ */
+ private class JdbcBinaryMetadataHandler extends BlockingJdbcChannel
implements BinaryMetadataHandler {
+ /** In-memory metadata cache. */
+ private final BinaryMetadataHandler cache =
BinaryCachingMetadataHandler.create();
+
+ /** {@inheritDoc} */
+ @Override public void addMeta(int typeId, BinaryType meta, boolean
failIfUnregistered)
+ throws BinaryObjectException {
+ try {
+ doRequest(new
JdbcBinaryTypePutRequest(((BinaryTypeImpl)meta).metadata()));
+ }
+ catch (ExecutionException | InterruptedException | ClientException
| SQLException e) {
+ throw new BinaryObjectException(e);
+ }
+
+ cache.addMeta(typeId, meta, failIfUnregistered); // merge
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addMetaLocally(int typeId, BinaryType meta,
+ boolean failIfUnregistered) throws BinaryObjectException {
+ throw new UnsupportedOperationException("Can't register metadata
locally for thin client.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryType metadata(int typeId) throws
BinaryObjectException {
+ BinaryType meta = cache.metadata(typeId);
+
+ if (meta == null)
+ meta = getBinaryType(typeId);
+
+ return meta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryMetadata metadata0(int typeId) throws
BinaryObjectException {
+ BinaryMetadata meta = cache.metadata0(typeId);
+
+ if (meta == null) {
+ BinaryTypeImpl binType = (BinaryTypeImpl)getBinaryType(typeId);
+
+ if (binType != null)
+ meta = binType.metadata();
+ }
+
+ return meta;
+ }
+
+ /**
+ * Request binary type from grid.
+ *
+ * @param typeId Type ID.
+ * @return Binary type.
+ */
+ private @Nullable BinaryType getBinaryType(int typeId) throws
BinaryObjectException {
+ BinaryType binType = null;
+ try {
+ JdbcBinaryTypeGetResult res = doRequest(new
JdbcBinaryTypeGetRequest(typeId));
+
+ BinaryMetadata meta = res.meta();
+
+ if (meta != null) {
+ binType = new BinaryTypeImpl(ctx, meta);
+
+ cache.addMeta(typeId, binType, false);
+ }
+ }
+ catch (ExecutionException | InterruptedException | ClientException
| SQLException e) {
+ throw new BinaryObjectException(e);
+ }
+
+ return binType;
+ }
+
+ /**
+ * Handle update binary schema result.
+ *
+ * @param res Result.
+ * @return {@code true} if handler was waiting for result with given
+ * request ID.
+ */
+ public boolean handleResult(JdbcUpdateBinarySchemaResult res) {
+ return handleResult(res.reqId(), res);
+ }
+
+ /**
+ * Handle binary type schema result.
+ *
+ * @param res Result.
+ * @return {@code true} if handler was waiting for result with given
+ * request ID.
+ */
+ public boolean handleResult(JdbcBinaryTypeGetResult res) {
+ return handleResult(res.reqId(), res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public BinaryType metadata(int typeId, int schemaId) throws
BinaryObjectException {
+ BinaryType type = metadata(typeId);
+
+ return type != null &&
((BinaryTypeImpl)type).metadata().hasSchema(schemaId) ? type : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<BinaryType> metadata() throws
BinaryObjectException {
+ return cache.metadata();
+ }
+ }
+
+ /**
+ * Jdbc channel to communicate in blocking style, regardless of whether
+ * streaming mode is enabled or not.
+ */
+ private abstract class BlockingJdbcChannel {
+ /** Request ID -> Jdbc result map. */
+ private Map<Long, CompletableFuture<JdbcResult>> results = new
ConcurrentHashMap<>();
+
+ /**
+ * Do request in blocking style. It just call
+ * {@link JdbcThinConnection#sendRequest(JdbcRequest)} for
non-streaming
+ * mode and creates future and waits it completion when streaming is
+ * enabled.
+ *
+ * @param req Request.
+ * @return Result for given request.
+ */
+ <R extends JdbcResult> R doRequest(JdbcRequest req) throws
SQLException, InterruptedException, ExecutionException {
+ R res;
+
+ if (isStream()) {
+ CompletableFuture<JdbcResult> resFut = new
CompletableFuture<>();
+
+ CompletableFuture<JdbcResult> oldFut =
results.put(req.requestId(), resFut);
+
+ assert oldFut == null : "Another request with the same id is
waiting for result.";
+
+ sendRequestNotWaitResponse(req, streamState.streamingStickyIo);
+
+ res = (R)resFut.get();
+ }
+ else
+ res = sendRequest(req).response();
+
+ return res;
+ }
+
+ /**
+ * Handles result for specified request ID.
+ *
+ * @param reqId Request id.
+ * @param res Result.
+ */
+ boolean handleResult(long reqId, JdbcResult res) {
+ boolean handled = false;
+
+ CompletableFuture<JdbcResult> fut = results.remove(reqId);
+
+ if (fut != null) {
+ fut.complete(res);
+
+ handled = true;
+ }
+
+ return handled;
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index 5b9f10f..6c795dd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -358,7 +358,9 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
/** {@inheritDoc} */
@Override public void setURL(int paramIdx, URL x) throws SQLException {
- setArgument(paramIdx, x);
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Parameter type is
unsupported. [cls=" + URL.class + ']');
}
/** {@inheritDoc} */
@@ -531,8 +533,7 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
ensureNotClosed();
if (val != null && !SqlListenerUtils.isPlainType(val.getClass()))
- throw new SQLException("Parameter type is unsupported. [cls=" +
val.getClass() + ']',
- SqlStateCode.INVALID_PARAMETER_VALUE);
+ ensureCustomObjectsSupported();
if (paramIdx < 1)
throw new SQLException("Parameter index is invalid: " + paramIdx);
@@ -545,4 +546,14 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
args.set(paramIdx - 1, val);
}
+
+ /**
+ * Ensures that statement support custom objects.
+ *
+ * @throws SQLException If statement don't support custom objects.
+ */
+ private void ensureCustomObjectsSupported() throws SQLException {
+ if (!conn.isCustomObjectSupported())
+ throw new SQLException("Custom objects are not supported");
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
index df2a045..6a56b98 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
@@ -1829,7 +1829,7 @@ public class JdbcThinResultSet implements ResultSet {
Class<?> cls = val.getClass();
- if (targetCls == cls)
+ if (targetCls.isAssignableFrom(cls))
return val;
else
throw new SQLException("Cannot convert to " +
targetCls.getName() + ": " + val,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 5f11619..47439a6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -47,8 +47,8 @@ import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultWithIo;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.sql.SqlKeyword;
import org.apache.ignite.internal.sql.SqlParseException;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index bb0152b..9129666 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryThreadLocalContext;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
@@ -50,7 +51,6 @@ import
org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcProtocolContext;
-import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCancelRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest;
@@ -154,20 +154,25 @@ public class JdbcThinTcpIo {
/** Protocol context (version, supported features, etc). */
private JdbcProtocolContext protoCtx;
+ /** Binary context for serialization/deserialization of binary objects. */
+ private final BinaryContext ctx;
+
/**
* Start connection and perform handshake.
*
* @param connProps Connection properties.
* @param sockAddr Socket address.
+ * @param ctx Binary context for proper serialization/deserialization of
binary objects.
* @param timeout Socket connection timeout in ms.
*
* @throws SQLException On connection error or reject.
* @throws IOException On IO error in handshake.
*/
- public JdbcThinTcpIo(ConnectionProperties connProps, InetSocketAddress
sockAddr, int timeout)
+ public JdbcThinTcpIo(ConnectionProperties connProps, InetSocketAddress
sockAddr, BinaryContext ctx, int timeout)
throws SQLException, IOException {
this.connProps = connProps;
this.sockAddr = sockAddr;
+ this.ctx = ctx;
Socket sock = null;
@@ -235,7 +240,7 @@ public class JdbcThinTcpIo {
srvProtoVer = handshakeRes.serverProtocolVersion();
- protoCtx = new JdbcProtocolContext(srvProtoVer,
handshakeRes.features());
+ protoCtx = new JdbcProtocolContext(srvProtoVer,
handshakeRes.features(), connProps.isKeepBinary());
}
/**
@@ -252,7 +257,7 @@ public class JdbcThinTcpIo {
marsh.setContext(new MarshallerContextImpl(null, null));
- ctx.configure(marsh, new IgniteConfiguration());
+ ctx.configure(marsh);
BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, new
BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE),
null, null);
@@ -444,7 +449,7 @@ public class JdbcThinTcpIo {
* @throws IOException In case of IO error.
* @throws SQLException On error.
*/
- void sendBatchRequestNoWaitResponse(JdbcOrderedBatchExecuteRequest req)
throws IOException, SQLException {
+ void sendRequestNoWaitResponse(JdbcRequest req) throws IOException,
SQLException {
if (!isUnorderedStreamSupported()) {
throw new SQLException("Streaming without response doesn't
supported by server [driverProtocolVer="
+ CURRENT_VER + ", remoteNodeVer=" + igniteVer + ']',
SqlStateCode.INTERNAL_ERROR);
@@ -500,8 +505,7 @@ public class JdbcThinTcpIo {
* @throws IOException In case of IO error.
*/
JdbcResponse readResponse() throws IOException {
- BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new
BinaryHeapInputStream(read()), null,
- null, false);
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new
BinaryHeapInputStream(read()), null, true);
JdbcResponse res = new JdbcResponse();
@@ -546,8 +550,8 @@ public class JdbcThinTcpIo {
private void sendRequestRaw(JdbcRequest req) throws IOException {
int cap = guessCapacity(req);
- BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new
BinaryHeapOutputStream(cap),
- null, null);
+ BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, new
BinaryHeapOutputStream(cap),
+ BinaryThreadLocalContext.get().schemaHolder(), null);
req.writeBinary(writer, protoCtx);
@@ -667,6 +671,15 @@ public class JdbcThinTcpIo {
}
/**
+ * Whether custom objects are supported by the server or not.
+ *
+ * @return {@code true} if custom objects are supported, {@code false}
otherwise.
+ */
+ boolean isCustomObjectSupported() {
+ return protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT);
+ }
+
+ /**
* Get next server index.
*
* @param len Number of servers.
@@ -722,7 +735,10 @@ public class JdbcThinTcpIo {
return connected;
}
- /** */
+ /**
+ * Set of features enabled on clien side. To get features
+ * supported by both sides use {@link #protoCtx}.
+ */
private EnumSet<JdbcThinFeature> enabledFeatures() {
EnumSet<JdbcThinFeature> features =
JdbcThinFeature.allFeaturesAsEnumSet();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 8bd4b07..a159043 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -1819,7 +1819,7 @@ public class JdbcResultSet implements ResultSet {
Class<?> cls = val.getClass();
- if (targetCls == cls)
+ if (targetCls.isAssignableFrom(cls))
return val;
else
throw new SQLException("Cannot convert to " +
targetCls.getName() + ": " + val,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index caa3f6e..5052728 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -285,7 +285,7 @@ public class ClientListenerNioListener extends
GridNioServerListenerAdapter<byte
marsh.setContext(new MarshallerContextImpl(null, null));
- ctx.configure(marsh, new IgniteConfiguration());
+ ctx.configure(marsh);
BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new
BinaryHeapInputStream(msg), null, true);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
index 5749c61..07da232 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.UUID;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
@@ -41,9 +42,21 @@ public abstract class SqlListenerUtils {
*/
@Nullable public static Object readObject(BinaryReaderExImpl reader,
boolean binObjAllow)
throws BinaryObjectException {
+ return readObject(reader, binObjAllow,true);
+ }
+
+ /**
+ * @param reader Reader.
+ * @param binObjAllow Allow to read non plaint objects.
+ * @param keepBinary Whether to deserialize objects or keep in binary
format.
+ * @return Read object.
+ * @throws BinaryObjectException On error.
+ */
+ @Nullable public static Object readObject(BinaryReaderExImpl reader,
boolean binObjAllow, boolean keepBinary)
+ throws BinaryObjectException {
byte type = reader.readByte();
- return readObject(type, reader, binObjAllow);
+ return readObject(type, reader, binObjAllow, keepBinary);
}
/**
@@ -53,8 +66,8 @@ public abstract class SqlListenerUtils {
* @return Read object.
* @throws BinaryObjectException On error.
*/
- @Nullable public static Object readObject(byte type, BinaryReaderExImpl
reader, boolean binObjAllow)
- throws BinaryObjectException {
+ @Nullable public static Object readObject(byte type, BinaryReaderExImpl
reader, boolean binObjAllow,
+ boolean keepBinary) throws BinaryObjectException {
switch (type) {
case GridBinaryMarshaller.NULL:
return null;
@@ -146,8 +159,13 @@ public abstract class SqlListenerUtils {
default:
reader.in().position(reader.in().position() - 1);
- if (binObjAllow)
- return reader.readObjectDetached();
+ if (binObjAllow) {
+ Object res = reader.readObjectDetached();
+
+ return !keepBinary && res instanceof BinaryObject
+ ? ((BinaryObject)res).deserialize()
+ : res;
+ }
else
throw new BinaryObjectException("Custom objects are not
supported");
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeGetRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeGetRequest.java
new file mode 100644
index 0000000..1a523e0
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeGetRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC get binary type metadata request.
+ */
+public class JdbcBinaryTypeGetRequest extends JdbcRequest {
+ /** ID of binary type. */
+ private int typeId;
+
+ /**
+ * Default constructor for deserialization purpose.
+ */
+ JdbcBinaryTypeGetRequest() {
+ super(BINARY_TYPE_GET);
+ }
+
+ /**
+ * @param typeId ID of binary type.
+ */
+ public JdbcBinaryTypeGetRequest(int typeId) {
+ super(BINARY_TYPE_GET);
+
+ this.typeId = typeId;
+ }
+
+ /**
+ * @return ID of binary type.
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ writer.writeInt(typeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ typeId = reader.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBinaryTypeGetRequest.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeGetResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeGetResult.java
new file mode 100644
index 0000000..94e66fc
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeGetResult.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.io.IOException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC get binary type metadata result.
+ */
+public class JdbcBinaryTypeGetResult extends JdbcResult {
+ /** ID of initial request. */
+ private long reqId;
+
+ /** Binary type metadata. */
+ private BinaryMetadata meta;
+
+ /** Default constructor for deserialization purpose. */
+ JdbcBinaryTypeGetResult() {
+ super(BINARY_TYPE_GET);
+ }
+
+ /**
+ * @param reqId ID of initial request.
+ * @param meta Metadata of binary type.
+ */
+ public JdbcBinaryTypeGetResult(long reqId, BinaryMetadata meta) {
+ super(BINARY_TYPE_GET);
+
+ this.reqId = reqId;
+ this.meta = meta;
+ }
+
+ /**
+ * Returns metadata of binary type.
+ *
+ * @return Metadata of binary type.
+ */
+ public BinaryMetadata meta() {
+ return meta;
+ }
+
+ /**
+ * Returns ID of initial request.
+ *
+ * @return ID of initial request.
+ */
+ public long reqId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ writer.writeLong(reqId);
+
+ try {
+ meta.writeTo(writer);
+ }
+ catch (IOException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ reqId = reader.readLong();
+ meta = new BinaryMetadata();
+
+ try {
+ meta.readFrom(reader);
+ }
+ catch (IOException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBinaryTypeGetResult.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNameGetRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNameGetRequest.java
new file mode 100644
index 0000000..5cc4272
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNameGetRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.MarshallerPlatformIds;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC get binary type name request.
+ */
+public class JdbcBinaryTypeNameGetRequest extends JdbcRequest {
+ /** ID of binary type. */
+ private int typeId;
+
+ /** ID of platform. */
+ private byte platformId;
+
+ /** Default constructor for deserialization purpose. */
+ JdbcBinaryTypeNameGetRequest() {
+ super(BINARY_TYPE_NAME_GET);
+ }
+
+ /**
+ * @param typeId ID of binary type.
+ * @param platformId ID of platform. See {@link MarshallerPlatformIds} for
supported values.
+ */
+ public JdbcBinaryTypeNameGetRequest(int typeId, byte platformId) {
+ super(BINARY_TYPE_NAME_GET);
+
+ this.typeId = typeId;
+ this.platformId = platformId;
+ }
+
+ /**
+ * Returns ID of binary type.
+ *
+ * @return ID of binary type.
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /**
+ * Returns ID of platform.
+ *
+ * @return ID of platform.
+ */
+ public byte platformId() {
+ return platformId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ writer.writeInt(typeId);
+ writer.writeByte(platformId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ typeId = reader.readInt();
+ platformId = reader.readByte();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBinaryTypeNameGetRequest.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNameGetResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNameGetResult.java
new file mode 100644
index 0000000..ab66ded
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNameGetResult.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC get binary type name result.
+ */
+public class JdbcBinaryTypeNameGetResult extends JdbcResult {
+ /** ID of initial request. */
+ private long reqId;
+
+ /** Type name. */
+ private String typeName;
+
+ /** Default constructor for deserialization purpose. */
+ JdbcBinaryTypeNameGetResult() {
+ super(BINARY_TYPE_NAME_GET);
+ }
+
+ /**
+ * @param reqId ID of initial request.
+ * @param typeName Name of the binary type.
+ */
+ public JdbcBinaryTypeNameGetResult(long reqId, String typeName) {
+ super(BINARY_TYPE_NAME_GET);
+
+ this.reqId = reqId;
+ this.typeName = typeName;
+ }
+
+ /**
+ * Returns name of the binary type.
+ *
+ * @return Name of the binary type.
+ */
+ public String typeName() {
+ return typeName;
+ }
+
+ /**
+ * Returns ID of initial request.
+ *
+ * @return ID of initial request.
+ */
+ public long reqId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ writer.writeLong(reqId);
+ writer.writeString(typeName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ reqId = reader.readLong();
+ typeName = reader.readString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBinaryTypeNameGetResult.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNamePutRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNamePutRequest.java
new file mode 100644
index 0000000..769389d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypeNamePutRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.MarshallerPlatformIds;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC put binary type name request.
+ */
+public class JdbcBinaryTypeNamePutRequest extends JdbcRequest {
+ /** ID of binary type. */
+ private int typeId;
+
+ /** ID of platform. */
+ private byte platformId;
+
+ /** Type name. */
+ private String typeName;
+
+ /** Default constructor for deserialization purpose. */
+ JdbcBinaryTypeNamePutRequest() {
+ super(BINARY_TYPE_NAME_PUT);
+ }
+
+ /**
+ * @param typeId ID of binary type.
+ * @param platformId ID of platform. See {@link MarshallerPlatformIds} for
supported values.
+ * @param typeName Name of the new binary type.
+ */
+ public JdbcBinaryTypeNamePutRequest(int typeId, byte platformId, String
typeName) {
+ super(BINARY_TYPE_NAME_PUT);
+
+ this.typeId = typeId;
+ this.platformId = platformId;
+ this.typeName = typeName;
+ }
+
+ /**
+ * Returns ID of binary type.
+ *
+ * @return ID of binary type.
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /**
+ * Returns ID of platform.
+ *
+ * @return ID of platform.
+ */
+ public byte platformId() {
+ return platformId;
+ }
+
+ /**
+ * Returns type name.
+ *
+ * @return Type name.
+ */
+ public String typeName() {
+ return typeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ writer.writeInt(typeId);
+ writer.writeByte(platformId);
+ writer.writeString(typeName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx)
+ throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ typeId = reader.readInt();
+ platformId = reader.readByte();
+ typeName = reader.readString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBinaryTypeNamePutRequest.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypePutRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypePutRequest.java
new file mode 100644
index 0000000..630f24f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBinaryTypePutRequest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.io.IOException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC put binary type metadata request.
+ */
+public class JdbcBinaryTypePutRequest extends JdbcRequest {
+ /** Metadata of binary type. */
+ private BinaryMetadata meta;
+
+ /** Default constructor for deserialization purpose. */
+ JdbcBinaryTypePutRequest() {
+ super(BINARY_TYPE_PUT);
+ }
+
+ /**
+ * @param meta Metadata of binary type.
+ */
+ public JdbcBinaryTypePutRequest(BinaryMetadata meta) {
+ super(BINARY_TYPE_PUT);
+
+ this.meta = meta;
+ }
+
+ /**
+ * Returns metadata of binary type.
+ *
+ * @return Metadata of binary type.
+ */
+ public BinaryMetadata meta() {
+ return meta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx) throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ try {
+ meta.writeTo(writer);
+ }
+ catch (IOException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx) throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ meta = new BinaryMetadata();
+
+ try {
+ meta.readFrom(reader);
+ }
+ catch (IOException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcBinaryTypePutRequest.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 705bf22..0f8fdc1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -206,7 +206,7 @@ public class JdbcConnectionContext extends
ClientListenerAbstractConnectionConte
actx = authenticate(ses.certificates(), user, passwd);
}
- protoCtx = new JdbcProtocolContext(ver, features);
+ protoCtx = new JdbcProtocolContext(ver, features, true);
parser = new JdbcMessageParser(ctx, protoCtx);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
index e73838f..fdf7a12 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.processors.odbc.jdbc;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryThreadLocalContext;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
@@ -40,6 +43,9 @@ public class JdbcMessageParser implements
ClientListenerMessageParser {
/** Initial output stream capacity. */
protected static final int INIT_CAP = 1024;
+ /** Binary context. */
+ private BinaryContext binCtx;
+
/**
* @param ctx Context.
* @param protoCtx Protocol context.
@@ -47,6 +53,7 @@ public class JdbcMessageParser implements
ClientListenerMessageParser {
public JdbcMessageParser(GridKernalContext ctx, JdbcProtocolContext
protoCtx) {
this.ctx = ctx;
this.protoCtx = protoCtx;
+ this.binCtx =
((CacheObjectBinaryProcessorImpl)ctx.cacheObjects()).marshaller().context();
}
/**
@@ -56,7 +63,7 @@ public class JdbcMessageParser implements
ClientListenerMessageParser {
protected BinaryReaderExImpl createReader(byte[] msg) {
BinaryInputStream stream = new BinaryHeapInputStream(msg);
- return new BinaryReaderExImpl(null, stream,
ctx.config().getClassLoader(), true);
+ return new BinaryReaderExImpl(binCtx, stream,
ctx.config().getClassLoader(), true);
}
/**
@@ -64,7 +71,8 @@ public class JdbcMessageParser implements
ClientListenerMessageParser {
* @return Writer.
*/
protected BinaryWriterExImpl createWriter(int cap) {
- return new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap),
null, null);
+ return new BinaryWriterExImpl(binCtx, new BinaryHeapOutputStream(cap),
+ BinaryThreadLocalContext.get().schemaHolder(), null);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcProtocolContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcProtocolContext.java
index 01a5388..f748caa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcProtocolContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcProtocolContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.odbc.jdbc;
import java.util.EnumSet;
+import java.util.Objects;
import
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
@@ -34,13 +35,21 @@ public class JdbcProtocolContext {
/** Features. */
private final EnumSet<JdbcThinFeature> features;
+ /** {@code true} if binary should not be deserialized. */
+ private final boolean keepBinary;
+
/**
* @param ver Protocol version.
* @param features Supported features.
+ * @param keepBinary Wether to keep objects in binary form.
*/
- public JdbcProtocolContext(ClientListenerProtocolVersion ver,
EnumSet<JdbcThinFeature> features) {
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public JdbcProtocolContext(ClientListenerProtocolVersion ver,
EnumSet<JdbcThinFeature> features, boolean keepBinary) {
+ assert Objects.nonNull(features);
+
this.ver = ver;
this.features = features;
+ this.keepBinary = keepBinary;
}
/**
@@ -72,9 +81,24 @@ public class JdbcProtocolContext {
}
/**
+ * @param feature {@code true} if given feature supported.
+ */
+ public boolean isFeatureSupported(JdbcThinFeature feature) {
+ return features.contains(feature);
+ }
+
+ /**
* @return Supported features.
*/
- public EnumSet<JdbcThinFeature> features() {
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ EnumSet<JdbcThinFeature> features() {
return features;
}
+
+ /**
+ * @return {@code true} if binary should not be deserialized.
+ */
+ public boolean keepBinary() {
+ return keepBinary;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
index 281b2f27..c2390e9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java
@@ -75,7 +75,7 @@ public class JdbcQuery implements JdbcRawBinarylizable {
writer.writeInt(args.length);
for (Object arg : args)
- SqlListenerUtils.writeObject(writer, arg, false);
+ SqlListenerUtils.writeObject(writer, arg,
protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT));
}
}
@@ -91,7 +91,8 @@ public class JdbcQuery implements JdbcRawBinarylizable {
args = new Object[argsNum];
for (int i = 0; i < argsNum; ++i)
- args[i] = SqlListenerUtils.readObject(reader, false);
+ args[i] = SqlListenerUtils.readObject(reader,
protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT),
+ protoCtx.keepBinary());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteRequest.java
index 9d390be..de07a69 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteRequest.java
@@ -151,7 +151,7 @@ public class JdbcQueryExecuteRequest extends JdbcRequest {
if (args != null) {
for (Object arg : args)
- JdbcUtils.writeObject(writer, arg, false, protoCtx);
+ JdbcUtils.writeObject(writer, arg, protoCtx);
}
if (protoCtx.isAutoCommitSupported())
@@ -180,7 +180,7 @@ public class JdbcQueryExecuteRequest extends JdbcRequest {
args = new Object[argsNum];
for (int i = 0; i < argsNum; ++i)
- args[i] = JdbcUtils.readObject(reader, false, protoCtx);
+ args[i] = JdbcUtils.readObject(reader, protoCtx);
if (protoCtx.isAutoCommitSupported())
autoCommit = reader.readBoolean();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
index 029e7a0..34c6882 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
@@ -76,6 +76,18 @@ public class JdbcRequest extends ClientListenerRequestNoId
implements JdbcRawBin
/** Get cache partitions distributions. */
public static final byte CACHE_PARTITIONS = 16;
+ /** Get binary type schema request. */
+ public static final byte BINARY_TYPE_GET = 17;
+
+ /** Update binary type schema request. */
+ public static final byte BINARY_TYPE_PUT = 18;
+
+ /** Get binary type name request. */
+ public static final byte BINARY_TYPE_NAME_GET = 19;
+
+ /** Update binary type name request. */
+ public static final byte BINARY_TYPE_NAME_PUT = 20;
+
/** Request Id generator. */
private static final AtomicLong REQ_ID_GENERATOR = new AtomicLong();
@@ -217,6 +229,26 @@ public class JdbcRequest extends ClientListenerRequestNoId
implements JdbcRawBin
break;
+ case BINARY_TYPE_NAME_PUT:
+ req = new JdbcBinaryTypeNamePutRequest();
+
+ break;
+
+ case BINARY_TYPE_NAME_GET:
+ req = new JdbcBinaryTypeNameGetRequest();
+
+ break;
+
+ case BINARY_TYPE_PUT:
+ req = new JdbcBinaryTypePutRequest();
+
+ break;
+
+ case BINARY_TYPE_GET:
+ req = new JdbcBinaryTypeGetRequest();
+
+ break;
+
default:
throw new IgniteException("Unknown SQL listener request ID:
[request ID=" + reqType + ']');
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 4e3fa01..a2270c8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -41,6 +41,8 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteVersionUtils;
import org.apache.ignite.internal.ThinProtocolFeature;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import
org.apache.ignite.internal.jdbc.thin.JdbcThinPartitionAwarenessMappingGroup;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
@@ -51,6 +53,7 @@ import
org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
@@ -73,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.apache.ignite.transactions.TransactionMixedModeException;
@@ -90,6 +94,10 @@ import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionCont
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_8_1;
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED;
+import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_GET;
+import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_NAME_GET;
+import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_NAME_PUT;
+import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_PUT;
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.CACHE_PARTITIONS;
import static
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS;
@@ -368,6 +376,22 @@ public class JdbcRequestHandler implements
ClientListenerRequestHandler {
resp = getCachePartitions((JdbcCachePartitionsRequest)req);
break;
+ case BINARY_TYPE_NAME_PUT:
+ resp =
registerBinaryType((JdbcBinaryTypeNamePutRequest)req);
+ break;
+
+ case BINARY_TYPE_NAME_GET:
+ resp =
getBinaryTypeName((JdbcBinaryTypeNameGetRequest)req);
+ break;
+
+ case BINARY_TYPE_PUT:
+ resp = putBinaryType((JdbcBinaryTypePutRequest)req);
+ break;
+
+ case BINARY_TYPE_GET:
+ resp = getBinaryType((JdbcBinaryTypeGetRequest)req);
+ break;
+
default:
resp = new
JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION,
"Unsupported JDBC request [req=" + req + ']');
@@ -1124,6 +1148,100 @@ public class JdbcRequestHandler implements
ClientListenerRequestHandler {
}
/**
+ * Handler for updating binary type requests.
+ *
+ * @param req Incoming request.
+ * @return Acknowledgement in case of successful updating.
+ */
+ private JdbcResponse putBinaryType(JdbcBinaryTypePutRequest req) {
+ try {
+ getBinaryCtx().updateMetadata(req.meta().typeId(), req.meta(),
false);
+
+ return resultToResonse(new
JdbcUpdateBinarySchemaResult(req.requestId(), true));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to update binary schema [reqId=" +
req.requestId() + ", req=" + req + ']', e);
+
+ return exceptionToResult(e);
+ }
+ }
+
+ /**
+ * Handler for querying binary type requests.
+ *
+ * @param req Incoming request.
+ * @return Response with binary type schema.
+ */
+ private JdbcResponse getBinaryType(JdbcBinaryTypeGetRequest req) {
+ try {
+ BinaryTypeImpl type =
(BinaryTypeImpl)connCtx.kernalContext().cacheObjects().binary().type(req.typeId());
+
+ return resultToResonse(new
JdbcBinaryTypeGetResult(req.requestId(), type != null ? type.metadata() :
null));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get binary type name [reqId=" +
req.requestId() + ", req=" + req + ']', e);
+
+ return exceptionToResult(e);
+ }
+ }
+
+ /**
+ * Handler for querying binary type name requests.
+ *
+ * @param req Incoming request.
+ * @return Response with binary type name.
+ */
+ private JdbcResponse getBinaryTypeName(JdbcBinaryTypeNameGetRequest req) {
+ try {
+ String name = getMarshallerCtx().getClassName(req.platformId(),
req.typeId());
+
+ return resultToResonse(new
JdbcBinaryTypeNameGetResult(req.requestId(), name));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get binary type name [reqId=" +
req.requestId() + ", req=" + req + ']', e);
+
+ return exceptionToResult(e);
+ }
+ }
+
+ /**
+ * Handler for register new binary type requests.
+ *
+ * @param req Incoming request.
+ * @return Acknowledgement in case of successful registration.
+ */
+ private JdbcResponse registerBinaryType(JdbcBinaryTypeNamePutRequest req) {
+ try {
+ boolean res =
getMarshallerCtx().registerClassName(req.platformId(), req.typeId(),
req.typeName(), false);
+
+ return resultToResonse(new
JdbcUpdateBinarySchemaResult(req.requestId(), res));
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to register new type [reqId=" +
req.requestId() + ", req=" + req + ']', e);
+
+ return exceptionToResult(e);
+ }
+ }
+
+ /**
+ * Get marshaller context from connection context.
+ *
+ * @return Marshaller context.
+ */
+ private MarshallerContext getMarshallerCtx() {
+ return connCtx.kernalContext().marshallerContext();
+ }
+
+ /**
+ * Get binary context from connection context.
+ *
+ * @return Binary context.
+ */
+ private BinaryContext getBinaryCtx() {
+ return
((CacheObjectBinaryProcessorImpl)connCtx.kernalContext().cacheObjects()).binaryContext();
+ }
+
+ /**
* @param req Request.
* @return Response.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
index f8b3f9e..b149366 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
@@ -78,6 +78,15 @@ public class JdbcResult implements JdbcRawBinarylizable {
/** A result of the processing cache partitions distributions request. */
static final byte CACHE_PARTITIONS = 19;
+ /** A result of the successfully updated binary schema. */
+ static final byte UPDATE_BINARY_SCHEMA_ACK = 20;
+
+ /** Get binary type schema result. */
+ static final byte BINARY_TYPE_GET = 21;
+
+ /** Get binary type name result. */
+ static final byte BINARY_TYPE_NAME_GET = 22;
+
/** Success status. */
private byte type;
@@ -205,6 +214,21 @@ public class JdbcResult implements JdbcRawBinarylizable {
break;
+ case UPDATE_BINARY_SCHEMA_ACK:
+ res = new JdbcUpdateBinarySchemaResult();
+
+ break;
+
+ case BINARY_TYPE_GET:
+ res = new JdbcBinaryTypeGetResult();
+
+ break;
+
+ case BINARY_TYPE_NAME_GET:
+ res = new JdbcBinaryTypeNameGetResult();
+
+ break;
+
default:
throw new IgniteException("Unknown SQL listener request ID:
[request ID=" + resId + ']');
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcThinFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcThinFeature.java
index 5f71bf3..572dcf1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcThinFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcThinFeature.java
@@ -24,7 +24,13 @@ import org.apache.ignite.internal.ThinProtocolFeature;
* Defines supported features for JDBC thin client.
*/
public enum JdbcThinFeature implements ThinProtocolFeature {
- RESERVED(0);
+ /** */
+ RESERVED(0),
+
+ /**
+ * Whether to allow sending custom object through Thin JDBC protocol.
+ */
+ CUSTOM_OBJECT(1);
/** */
private static final EnumSet<JdbcThinFeature> ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(JdbcThinFeature.class);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUpdateBinarySchemaResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUpdateBinarySchemaResult.java
new file mode 100644
index 0000000..e1e20cf
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUpdateBinarySchemaResult.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * JDBC update binary schema result.
+ */
+public class JdbcUpdateBinarySchemaResult extends JdbcResult {
+ /** ID of initial request. */
+ private long reqId;
+
+ /** Status of updating. */
+ private boolean success;
+
+ /** Default constructor for deserialization purpose. */
+ JdbcUpdateBinarySchemaResult() {
+ super(UPDATE_BINARY_SCHEMA_ACK);
+ }
+
+ /**
+ * @param reqId ID of initial request.
+ * @param success Status of updating (true if success).
+ */
+ public JdbcUpdateBinarySchemaResult(long reqId, boolean success) {
+ super(UPDATE_BINARY_SCHEMA_ACK);
+
+ this.reqId = reqId;
+ this.success = success;
+ }
+
+ /**
+ * Returns status of updating.
+ *
+ * @return Status of updating.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /**
+ * Returns ID of initial operation.
+ *
+ * @return Id of initial request.
+ */
+ public long reqId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriterExImpl writer,
JdbcProtocolContext protoCtx) throws BinaryObjectException {
+ super.writeBinary(writer, protoCtx);
+
+ writer.writeLong(reqId);
+ writer.writeBoolean(success);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx) throws BinaryObjectException {
+ super.readBinary(reader, protoCtx);
+
+ reqId = reader.readLong();
+ success = reader.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JdbcUpdateBinarySchemaResult.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
index 37c8591..6a617c1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
@@ -44,7 +44,7 @@ public class JdbcUtils {
writer.writeInt(row.size());
for (Object obj : row)
- writeObject(writer, obj, false, protoCtx);
+ writeObject(writer, obj, protoCtx);
}
}
}
@@ -65,7 +65,7 @@ public class JdbcUtils {
List<Object> col = new ArrayList<>(colsSize);
for (int colCnt = 0; colCnt < colsSize; ++colCnt)
- col.add(readObject(reader, false, protoCtx));
+ col.add(readObject(reader, protoCtx));
items.add(col);
}
@@ -134,31 +134,35 @@ public class JdbcUtils {
/**
* @param reader Reader.
- * @param binObjAllow Allow to read non plaint objects.
* @param protoCtx Protocol context.
* @return Read object.
* @throws BinaryObjectException On error.
*/
- @Nullable public static Object readObject(BinaryReaderExImpl reader,
boolean binObjAllow,
- JdbcProtocolContext protoCtx) throws BinaryObjectException {
- return SqlListenerUtils.readObject(reader.readByte(), reader,
binObjAllow);
+ @Nullable public static Object readObject(
+ BinaryReaderExImpl reader,
+ JdbcProtocolContext protoCtx
+ ) throws BinaryObjectException {
+ return SqlListenerUtils.readObject(reader.readByte(), reader,
+ protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT),
protoCtx.keepBinary());
}
/**
* @param writer Writer.
* @param obj Object to write.
- * @param binObjAllow Allow to write non plain objects.
* @param protoCtx Protocol context.
* @throws BinaryObjectException On error.
*/
- public static void writeObject(BinaryWriterExImpl writer, @Nullable Object
obj, boolean binObjAllow,
- JdbcProtocolContext protoCtx) throws BinaryObjectException {
+ public static void writeObject(
+ BinaryWriterExImpl writer,
+ @Nullable Object obj,
+ JdbcProtocolContext protoCtx
+ ) throws BinaryObjectException {
if (obj == null) {
writer.writeByte(GridBinaryMarshaller.NULL);
return;
}
- SqlListenerUtils.writeObject(writer, obj, binObjAllow);
+ SqlListenerUtils.writeObject(writer, obj,
protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 1fa5fe1..2b4b59e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -889,7 +889,7 @@ public class PlatformUtils {
marsh.setContext(new MarshallerContextImpl(null, null));
- ctx.configure(marsh, new IgniteConfiguration());
+ ctx.configure(marsh);
return new GridBinaryMarshaller(ctx);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 43bc9de..30d220f 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -340,7 +340,7 @@ public class IgniteMock implements Ignite {
};
if (marshaller instanceof BinaryMarshaller)
- ctx.configure((BinaryMarshaller)marshaller, configuration());
+ ctx.configure((BinaryMarshaller)marshaller,
configuration().getBinaryConfiguration());
}
binaryMock = new NoOpBinary() {