This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 05f48742673 IGNITE-23114 JDBC thin: add support for stream APIs for 
BLOB (#11518)
05f48742673 is described below

commit 05f48742673cc0888a41c5798b0cde1fa6a3f61e
Author: Sergey Korotkov <[email protected]>
AuthorDate: Fri Nov 15 17:17:33 2024 +0700

    IGNITE-23114 JDBC thin: add support for stream APIs for BLOB (#11518)
---
 .../internal/jdbc2/JdbcBinaryBufferTest.java       |  55 +++
 .../apache/ignite/internal/jdbc2/JdbcBlobTest.java | 358 ++++++++++++++++-
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java      |   2 +
 .../ignite/jdbc/thin/JdbcThinBatchSelfTest.java    |  27 +-
 .../thin/JdbcThinPreparedStatementSelfTest.java    | 243 ++++++++++--
 .../jdbc/thin/JdbcThinResultSetSelfTest.java       | 206 +++++++++-
 .../ignite/internal/binary/BinaryWriterExImpl.java |  64 +++
 .../binary/streams/BinaryAbstractOutputStream.java |   2 +-
 .../internal/jdbc/thin/JdbcThinConnection.java     |   2 +-
 .../jdbc/thin/JdbcThinPreparedStatement.java       |  36 +-
 .../internal/jdbc/thin/JdbcThinResultSet.java      |  49 ++-
 .../ignite/internal/jdbc2/JdbcBinaryBuffer.java    | 440 +++++++++++++++++++++
 .../org/apache/ignite/internal/jdbc2/JdbcBlob.java | 195 ++++++---
 .../processors/odbc/SqlInputStreamWrapper.java     |  63 +++
 .../internal/processors/odbc/SqlListenerUtils.java |  89 ++++-
 .../internal/processors/odbc/jdbc/JdbcUtils.java   |  21 +-
 16 files changed, 1701 insertions(+), 151 deletions(-)

diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBufferTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBufferTest.java
new file mode 100644
index 00000000000..ea6f469ea47
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBufferTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.IOException;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+import static org.apache.ignite.internal.jdbc2.JdbcBinaryBuffer.MIN_CAP;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.junit.Assert.assertEquals;
+
+/** */
+public class JdbcBinaryBufferTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCapacity() throws Exception {
+        assertEquals(MIN_CAP, JdbcBinaryBuffer.capacity(10, 20));
+
+        assertEquals(MAX_ARRAY_SIZE, JdbcBinaryBuffer.capacity(10, 
MAX_ARRAY_SIZE));
+
+        assertEquals(MIN_CAP * 16, JdbcBinaryBuffer.capacity(MIN_CAP, MIN_CAP 
* 10));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWriteTooMuchData() throws Exception {
+        JdbcBinaryBuffer buf = JdbcBinaryBuffer.createReadWrite(new byte[10]);
+
+        assertThrows(null, () -> {
+            buf.write(MAX_ARRAY_SIZE, 1);
+
+            return null;
+        }, IOException.class, "Too much data. Can't write more then 2147483639 
bytes.");
+    }
+}
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
index 0b0119e48e9..3bd494e02af 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
@@ -19,16 +19,24 @@ package org.apache.ignite.internal.jdbc2;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Blob;
 import java.sql.SQLException;
 import java.util.Arrays;
 import org.junit.Test;
 
+import static 
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /** */
 public class JdbcBlobTest {
+    /** */
+    static final String ERROR_BLOB_FREE = "Blob instance can't be used after 
free() has been called.";
+
     /**
      * @throws Exception If failed.
      */
@@ -155,6 +163,21 @@ public class JdbcBlobTest {
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testGetBinaryStreamReadMoreThenBlobSize() throws Exception {
+        byte[] arr = new byte[] {1, 2, 3, 4, 5};
+
+        JdbcBlob blob = new JdbcBlob(arr);
+
+        InputStream is = blob.getBinaryStream();
+        byte[] res = new byte[7];
+        assertEquals(5, is.read(res, 0, 7));
+        assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 0, 0}, res);
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -228,6 +251,145 @@ public class JdbcBlobTest {
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testGetBinaryStreamWithParamsReadMoreThenStreamLimit() throws 
Exception {
+        byte[] arr = new byte[] {1, 2, 3, 4, 5};
+
+        JdbcBlob blob = new JdbcBlob(arr);
+
+        InputStream is = blob.getBinaryStream(2, 3);
+        byte[] res = new byte[6];
+        assertEquals(3, is.read(res, 1, 5));
+        assertArrayEquals(new byte[] {0, 2, 3, 4, 0, 0}, res);
+        assertEquals(-1, is.read(res, 0, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testGetBinaryStreamReadFromTruncated() throws Exception {
+        byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+        JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(arr, 0, 
arr.length));
+
+        InputStream is = blob.getBinaryStream(2, 4);
+
+        assertEquals(1, is.read());
+
+        blob.truncate(6);
+
+        assertEquals(2, is.read());
+
+        blob.truncate(2);
+
+        assertEquals(-1, is.read());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testGetBinaryStreamMarkSkipReset() throws Exception {
+        byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+        JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadWrite(arr));
+
+        InputStream is = blob.getBinaryStream(2, arr.length - 2);
+
+        assertEquals(1, is.read());
+        is.reset();
+        assertEquals(1, is.read());
+
+        assertEquals(2, is.read());
+        is.mark(1);
+        assertEquals(3, is.read());
+        assertEquals(4, is.read());
+        is.reset();
+        assertEquals(3, is.read());
+
+        assertEquals(0, is.skip(-1));
+        assertEquals(0, is.skip(0));
+        assertEquals(1, is.skip(1));
+        assertEquals(2, is.skip(2));
+        assertEquals(7, is.read());
+
+        is.reset();
+        assertEquals(3, is.read());
+
+        assertEquals(4, is.skip(100));
+        assertEquals(-1, is.read());
+
+        is.reset();
+        assertEquals(5, is.skip(Long.MAX_VALUE));
+        assertEquals(-1, is.read());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testGetBinaryStreamSeeChangesDoneAfterCreate() throws 
Exception {
+        byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+        JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadWrite(arr));
+
+        InputStream is = blob.getBinaryStream();
+
+        assertEquals(0, is.read());
+        assertEquals(1, is.read());
+
+        OutputStream os = blob.setBinaryStream(3);
+        os.write(11);
+
+        assertEquals(11, is.read());
+
+        blob.setBytes(4, new byte[] {12});
+
+        assertEquals(12, is.read());
+
+        byte[] res = is.readAllBytes();
+        assertArrayEquals(new byte[] {4, 5, 6, 7, 8}, res);
+
+        blob.setBytes(blob.length() + 1, new byte[] {13, 14, 15});
+        assertEquals(13, is.read());
+        assertEquals(14, is.read());
+        assertEquals(15, is.read());
+        assertEquals(-1, is.read());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testGetBinaryStreamWithParamsSeeChangesDoneAfterCreate() 
throws Exception {
+        byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+        JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(arr, 0, 
arr.length));
+
+        InputStream is = blob.getBinaryStream(2, arr.length - 2);
+
+        assertEquals(1, is.read());
+
+        OutputStream os = blob.setBinaryStream(3);
+        os.write(11);
+
+        assertEquals(11, is.read());
+
+        blob.setBytes(4, new byte[] {12});
+
+        assertEquals(12, is.read());
+
+        byte[] res = is.readNBytes(4);
+        assertArrayEquals(new byte[] {4, 5, 6, 7}, res);
+
+        blob.setBytes(blob.length() + 1, new byte[] {13});
+        assertEquals(-1, is.read());
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -237,7 +399,7 @@ public class JdbcBlobTest {
 
         JdbcBlob blob = new JdbcBlob(arr);
 
-        assertEquals(-1, blob.position(new byte[] {1, 2, 3}, 0));
+        assertThrows(null, () -> blob.position(new byte[] {1, 2, 3}, 0), 
SQLException.class, null);
         assertEquals(-1, blob.position(new byte[] {1, 2, 3}, arr.length + 1));
         assertEquals(-1, blob.position(new byte[0], 1));
         assertEquals(-1, blob.position(new byte[17], 1));
@@ -251,6 +413,9 @@ public class JdbcBlobTest {
         assertEquals(-1, blob.position(new byte[] {0, 2, 3}, 1));
         assertEquals(-1, blob.position(new byte[] {1, 2, 4}, 1));
 
+        blob.setBytes(17, new byte[] {16, 16, 16, 33, 46});
+        assertEquals(18, blob.position(new byte[] {16, 16, 33}, 1));
+
         blob.free();
 
         try {
@@ -272,7 +437,7 @@ public class JdbcBlobTest {
 
         JdbcBlob blob = new JdbcBlob(arr);
 
-        assertEquals(-1, blob.position(new JdbcBlob(new byte[] {1, 2, 3}), 0));
+        assertThrows(null, () -> blob.position(new JdbcBlob(new byte[] {1, 2, 
3}), 0), SQLException.class, null);
         assertEquals(-1, blob.position(new JdbcBlob(new byte[] {1, 2, 3}), 
arr.length + 1));
         assertEquals(-1, blob.position(new JdbcBlob(new byte[0]), 1));
         assertEquals(-1, blob.position(new JdbcBlob(new byte[17]), 1));
@@ -286,6 +451,9 @@ public class JdbcBlobTest {
         assertEquals(-1, blob.position(new JdbcBlob(new byte[] {0, 2, 3}), 1));
         assertEquals(-1, blob.position(new JdbcBlob(new byte[] {1, 2, 4}), 1));
 
+        blob.setBytes(17, new byte[] {16, 16, 16, 33, 46});
+        assertEquals(18, blob.position(new JdbcBlob(new byte[] {16, 16, 33}), 
1));
+
         blob.free();
 
         try {
@@ -321,7 +489,7 @@ public class JdbcBlobTest {
 
             fail();
         }
-        catch (ArrayIndexOutOfBoundsException e) {
+        catch (SQLException e) {
             // No-op.
         }
 
@@ -350,6 +518,52 @@ public class JdbcBlobTest {
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBytesRO() throws Exception {
+        byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+        Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+
+        blob.setBytes(2, new byte[] {11, 22});
+
+        assertArrayEquals(new byte[] {2, 11, 22, 5}, blob.getBytes(1, 
(int)blob.length()));
+
+        assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBytesRealloc() throws Exception {
+        byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+        Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+
+        blob.setBytes(5, new byte[JdbcBinaryBuffer.MIN_CAP]);
+
+        assertEquals(JdbcBinaryBuffer.MIN_CAP + 4, blob.length());
+
+        assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBytesTooMuchData() throws Exception {
+        Blob blob = new JdbcBlob();
+
+        blob.setBytes(1, new byte[1]);
+
+        // Use fake one byte array.
+        assertThrows(null, () -> blob.setBytes(2, new byte[1], 0, 
MAX_ARRAY_SIZE),
+                SQLException.class, "Too much data. Can't write more then 
2147483639 bytes.");
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -373,7 +587,7 @@ public class JdbcBlobTest {
 
             fail();
         }
-        catch (ArrayIndexOutOfBoundsException e) {
+        catch (SQLException e) {
             // No-op.
         }
 
@@ -382,7 +596,7 @@ public class JdbcBlobTest {
 
             fail();
         }
-        catch (ArrayIndexOutOfBoundsException e) {
+        catch (IndexOutOfBoundsException e) {
             // No-op.
         }
 
@@ -391,7 +605,7 @@ public class JdbcBlobTest {
 
             fail();
         }
-        catch (ArrayIndexOutOfBoundsException e) {
+        catch (IndexOutOfBoundsException e) {
             // No-op.
         }
 
@@ -471,10 +685,140 @@ public class JdbcBlobTest {
         }
         catch (SQLException e) {
             // No-op.
-            System.out.println();
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTruncateRO() throws Exception {
+        byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+        Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+        assertArrayEquals(new byte[] {2, 3, 4, 5}, blob.getBytes(1, 
(int)blob.length()));
+
+        blob.truncate(2);
+        assertArrayEquals(new byte[] {2, 3}, blob.getBytes(1, 
(int)blob.length()));
+
+        assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFree() throws Exception {
+        Blob blob = new JdbcBlob(new byte[] {0, 1, 2, 3, 4, 5, 6, 7});
+
+        assertEquals(8, blob.length());
+
+        blob.free();
+
+        blob.free();
+
+        assertThrows(null, blob::length, SQLException.class, ERROR_BLOB_FREE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBinaryStream() throws Exception {
+        Blob blob = new JdbcBlob();
+
+        assertThrows(null, () -> blob.setBinaryStream(0), SQLException.class, 
null);
+        assertThrows(null, () -> blob.setBinaryStream(2), SQLException.class, 
null);
+
+        OutputStream os = blob.setBinaryStream(1);
+
+        os.write(0);
+        os.write(new byte[] {1, 2, 3, 4, 5, 6, 7});
+        os.close();
+        assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7}, blob.getBytes(1, 
(int)blob.length()));
+
+        os = blob.setBinaryStream(3);
+        os.write(new byte[] {20, 21, 22});
+        os.write(23);
+        os.close();
+        assertArrayEquals(new byte[]{0, 1, 20, 21, 22, 23, 6, 7}, 
blob.getBytes(1, (int)blob.length()));
+
+        os = blob.setBinaryStream(7);
+        os.write(new byte[] {30, 31, 32});
+        os.write(33);
+        os.close();
+        assertArrayEquals(new byte[]{0, 1, 20, 21, 22, 23, 30, 31, 32, 33}, 
blob.getBytes(1, (int)blob.length()));
+
+        blob.free();
+        assertThrows(null, () -> blob.setBinaryStream(2L), SQLException.class, 
ERROR_BLOB_FREE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBinaryStreamTooMuchData() throws Exception {
+        Blob blob = new JdbcBlob();
+
+        OutputStream os = blob.setBinaryStream(1);
+        os.write(1);
+
+        assertThrows(null, () -> {
+            // Use fake one byte array.
+            os.write(new byte[1], 0, MAX_ARRAY_SIZE);
+
+            return null;
+        }, IOException.class, "Too much data. Can't write more then 2147483639 
bytes.");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBinaryStreamRO() throws Exception {
+        byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+        Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+        assertArrayEquals(new byte[] {2, 3, 4, 5}, blob.getBytes(1, 
(int)blob.length()));
+
+        OutputStream os = blob.setBinaryStream(5);
+        os.write(new byte[] {11, 22});
+        os.close();
+        assertArrayEquals(new byte[] {2, 3, 4, 5, 11, 22}, blob.getBytes(1, 
(int)blob.length()));
+
+        assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSetBinaryStreamWriteToTruncated() throws Exception {
+        Blob blob = new JdbcBlob();
+
+        OutputStream os = blob.setBinaryStream(1);
+
+        os.write(1);
+
+        blob.truncate(0);
+
+        assertThrows(null, () -> {
+            os.write(2);
+
+            return null;
+        }, IOException.class, "Writting beyond end of Blob, it probably was 
truncated after OutputStream " +
+                "was created [pos=1, blobLength=0]");
+
+        assertThrows(null, () -> {
+            os.write(new byte[] {2});
+
+            return null;
+        }, IOException.class, "Writting beyond end of Blob, it probably was 
truncated after OutputStream " +
+                "was created [pos=1, blobLength=0]");
+
+        os.close();
+    }
+
     /**
      * @param is Input stream.
      */
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index f5b133e5fad..bc9e274523c 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.jdbc.suite;
 
 import java.security.Security;
 import org.apache.ignite.common.RunningQueryInfoCheckInitiatorTest;
+import org.apache.ignite.internal.jdbc2.JdbcBinaryBufferTest;
 import org.apache.ignite.internal.jdbc2.JdbcBlobTest;
 import org.apache.ignite.internal.jdbc2.JdbcBulkLoadSelfTest;
 import org.apache.ignite.internal.jdbc2.JdbcClobTest;
@@ -127,6 +128,7 @@ import org.junit.runners.Suite;
     JdbcSchemaCaseSelfTest.class,
 
     JdbcClobTest.class,
+    JdbcBinaryBufferTest.class,
     JdbcBlobTest.class,
     org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class,
     JdbcThinStreamingNotOrderedSelfTest.class,
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
index d9664b8a089..a4b5f6e8383 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.jdbc.thin;
 
+import java.io.ByteArrayInputStream;
 import java.sql.BatchUpdateException;
+import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -37,8 +39,8 @@ import static org.junit.Assert.assertArrayEquals;
  */
 public class JdbcThinBatchSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest {
     /** SQL query. */
-    private static final String SQL_PREPARED = "insert into Person(_key, id, 
firstName, lastName, age) values " +
-        "(?, ?, ?, ?, ?)";
+    private static final String SQL_PREPARED = "insert into Person(_key, id, 
firstName, lastName, age, data) values " +
+        "(?, ?, ?, ?, ?, ?)";
 
     /** Statement. */
     private Statement stmt;
@@ -494,6 +496,14 @@ public class JdbcThinBatchSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest
             pstmt.setString(paramCnt++, "Lastname" + i);
             pstmt.setInt(paramCnt++, 20 + i);
 
+            if (i % 2 == 0) {
+                Blob blob = conn.createBlob();
+                blob.setBytes(1, getBytes("White"));
+                pstmt.setBlob(paramCnt, blob);
+            }
+            else
+                pstmt.setBinaryStream(paramCnt, new 
ByteArrayInputStream(getBytes("Black")));
+
             pstmt.addBatch();
         }
 
@@ -525,6 +535,14 @@ public class JdbcThinBatchSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest
             pstmt.setString(paramCnt++, "Lastname" + i);
             pstmt.setInt(paramCnt++, 20 + i);
 
+            if (i % 2 == 0) {
+                Blob blob = conn.createBlob();
+                blob.setBytes(1, getBytes("White" + i));
+                pstmt.setBlob(paramCnt, blob);
+            }
+            else
+                pstmt.setBinaryStream(paramCnt, new 
ByteArrayInputStream(getBytes("Black" + i)));
+
             pstmt.addBatch();
         }
 
@@ -534,6 +552,7 @@ public class JdbcThinBatchSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest
         pstmt.setString(paramCnt++, "Name" + FAILED_IDX);
         pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX);
         pstmt.setInt(paramCnt++, 20 + FAILED_IDX);
+        pstmt.setBinaryStream(paramCnt, new 
ByteArrayInputStream(getBytes("Black" + FAILED_IDX)));
 
         pstmt.addBatch();
 
@@ -543,6 +562,9 @@ public class JdbcThinBatchSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest
         pstmt.setString(paramCnt++, "Name" + FAILED_IDX + 1);
         pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX + 1);
         pstmt.setInt(paramCnt++, 20 + FAILED_IDX + 1);
+        Blob blob = conn.createBlob();
+        blob.setBytes(1, getBytes("White" + FAILED_IDX + 1));
+        pstmt.setBlob(paramCnt, blob);
 
         pstmt.addBatch();
 
@@ -917,6 +939,7 @@ public class JdbcThinBatchSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest
         stmt.setString(paramCnt++, "Name" + personIdx);
         stmt.setString(paramCnt++, "Lastname" + personIdx);
         stmt.setInt(paramCnt++, 20 + personIdx);
+        stmt.setBinaryStream(paramCnt, new 
ByteArrayInputStream(getBytes("White")));
     }
 
     /**
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
index 7982980491d..6adbebfcd51 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.jdbc.thin;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Reader;
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -30,6 +33,7 @@ import java.sql.NClob;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -67,6 +71,8 @@ import static java.sql.Types.TINYINT;
 import static java.sql.Types.VARCHAR;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 
@@ -130,7 +136,7 @@ public class JdbcThinPreparedStatementSelfTest extends 
JdbcThinAbstractSelfTest
         o.bigVal = new BigDecimal(1);
         o.strVal = "str";
         o.arrVal = new byte[] {1};
-        o.blobVal = new byte[] {1};
+        o.blobVal = new byte[] {1, 2, 3};
         o.clobVal = "large str";
         o.dateVal = new Date(1);
         o.timeVal = new Time(1);
@@ -867,7 +873,7 @@ public class JdbcThinPreparedStatementSelfTest extends 
JdbcThinAbstractSelfTest
 
         Blob blob = conn.createBlob();
 
-        blob.setBytes(1, new byte[] {1});
+        blob.setBytes(1, new byte[] {1, 2, 3});
 
         stmt.setBlob(1, blob);
 
@@ -886,6 +892,209 @@ public class JdbcThinPreparedStatementSelfTest extends 
JdbcThinAbstractSelfTest
         assertFalse(rs.next());
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobWrittenViaOutputStream() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        Blob blob = conn.createBlob();
+
+        try (OutputStream out = blob.setBinaryStream(1)) {
+            out.write(new byte[] {1, 2, 3});
+        }
+
+        stmt.setBlob(1, blob);
+
+        checkStmtExec(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobSeeChangesDoneAfterAddToStatement() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        Blob blob = conn.createBlob();
+
+        stmt.setBlob(1, blob);
+
+        try (OutputStream out = blob.setBinaryStream(1)) {
+            out.write(new byte[] {1});
+            out.write(2);
+        }
+
+        blob.setBytes(3, new byte[] {3});
+
+        checkStmtExec(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobNull() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        stmt.setBlob(1, (Blob)null);
+        checkStmtExec(2);
+
+        stmt.setNull(1, BLOB);
+        checkStmtExec(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStreamKnownLength() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1, 
2, 3});
+
+        assertThrows(null, () -> {
+            stmt.setBinaryStream(1, stream, -1L);
+            return null;
+        }, SQLException.class, null);
+
+        assertThrows(null, () -> {
+            stmt.setBinaryStream(1, stream, -1);
+            return null;
+        }, SQLException.class, null);
+
+        assertThrows(null, () -> {
+            stmt.setBinaryStream(1, stream, (long)MAX_ARRAY_SIZE + 1);
+            return null;
+        }, SQLFeatureNotSupportedException.class, null);
+
+        assertThrows(null, () -> {
+            stmt.setBinaryStream(1, stream, MAX_ARRAY_SIZE + 1);
+            return null;
+        }, SQLFeatureNotSupportedException.class, null);
+
+        stmt.setBinaryStream(1, stream, 3);
+        checkStmtExec(1);
+
+        stream.reset();
+        stmt.setBinaryStream(1, stream, 3L);
+        checkStmtExec(1);
+
+        stream.reset();
+        stmt.setBinaryStream(1, stream, 10L);
+        assertThrows(null, () -> stmt.executeQuery(), SQLException.class, 
null);
+
+        stmt.setBinaryStream(1, null, 0);
+        checkStmtExec(2);
+
+        stmt.setBinaryStream(1, null, 0L);
+        checkStmtExec(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStreamUnknownLength() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1, 
2, 3});
+
+        stmt.setBinaryStream(1, stream);
+        checkStmtExec(1);
+
+        stmt.setBinaryStream(1, null);
+        checkStmtExec(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStreamThrows() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        stmt.setBinaryStream(1, new ThrowingInputStream());
+
+        assertThrows(null, () -> stmt.executeQuery(), SQLException.class, 
null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobStreamKnownLength() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1, 
2, 3});
+
+        assertThrows(null, () -> {
+            stmt.setBlob(1, stream, -1L);
+            return null;
+        }, SQLException.class, null);
+
+        assertThrows(null, () -> {
+            stmt.setBlob(1, stream, (long)MAX_ARRAY_SIZE + 1);
+            return null;
+        }, SQLFeatureNotSupportedException.class, null);
+
+        stmt.setBlob(1, stream, 3L);
+        checkStmtExec(1);
+
+        stream.reset();
+        stmt.setBlob(1, stream, 10L);
+        assertThrows(null, () -> stmt.executeQuery(), SQLException.class, 
null);
+
+        stmt.setBlob(1, null, 0L);
+        checkStmtExec(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobStreamUnknownLength() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1, 
2, 3});
+
+        stmt.setBlob(1, stream);
+        checkStmtExec(1);
+
+        stmt.setBlob(1, (Blob)null);
+        checkStmtExec(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobStreamThrows() throws Exception {
+        stmt = conn.prepareStatement(SQL_PART + " where blobVal is not 
distinct from ?");
+
+        stmt.setBinaryStream(1, new ThrowingInputStream());
+
+        assertThrows(null, () -> stmt.executeQuery(), SQLException.class, 
null);
+    }
+
+    /** */
+    private void checkStmtExec(int expectedId) throws SQLException {
+        ResultSet rs = stmt.executeQuery();
+
+        assertTrue(rs.next());
+        assertEquals(expectedId, rs.getInt("id"));
+        assertFalse(rs.next());
+    }
+
+    /** */
+    static class ThrowingInputStream extends InputStream {
+        /** {@inheritDoc} */
+        @Override public int read() throws IOException {
+            throw new IOException();
+        }
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -1089,36 +1298,6 @@ public class JdbcThinPreparedStatementSelfTest extends 
JdbcThinAbstractSelfTest
             }
         });
 
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                stmt.setBinaryStream(1, null);
-            }
-        });
-
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                stmt.setBinaryStream(1, null, 0);
-            }
-        });
-
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                stmt.setBinaryStream(1, null, 0L);
-            }
-        });
-
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                stmt.setBlob(1, (InputStream)null);
-            }
-        });
-
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                stmt.setBlob(1, null, 0L);
-            }
-        });
-
         checkNotSupported(new RunnableX() {
             @Override public void runx() throws Exception {
                 stmt.setCharacterStream(1, null);
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
index 60210c335db..016850d125e 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
@@ -144,7 +144,7 @@ public class JdbcThinResultSetSelfTest extends 
JdbcThinAbstractSelfTest {
         o.bigVal = new BigDecimal(1);
         o.strVal = "1";
         o.arrVal = new byte[] {1};
-        o.blobVal = new byte[] {1};
+        o.blobVal = new byte[] {1, 2, 3};
         o.clobVal = "str";
         o.dateVal = new Date(1, 1, 1);
         o.timeVal = new Time(1, 1, 1);
@@ -611,10 +611,63 @@ public class JdbcThinResultSetSelfTest extends 
JdbcThinAbstractSelfTest {
         assertTrue(rs.next());
 
         Blob blob = rs.getBlob("blobVal");
-        Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new 
byte[] {1});
+        Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new 
byte[] {1, 2, 3});
 
         blob = rs.getBlob(16);
-        Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new 
byte[] {1});
+        Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new 
byte[] {1, 2, 3});
+
+        assertFalse(rs.next());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobForNonBinaryType() throws Exception {
+        ResultSet rs = stmt.executeQuery(SQL);
+
+        assertTrue(rs.next());
+
+        assertThrows(null, () -> rs.getBlob("strVal"), SQLException.class, 
"Cannot convert to Blob [colIdx=10]");
+
+        assertFalse(rs.next());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobChangeAfterSelect() throws Exception {
+        ResultSet rs = stmt.executeQuery(SQL);
+
+        assertTrue(rs.next());
+
+        Blob blob1 = rs.getBlob("blobVal");
+        Blob blob2 = rs.getBlob("blobVal");
+        Assert.assertArrayEquals(blob1.getBytes(1, (int)blob1.length()), new 
byte[]{1, 2, 3});
+        Assert.assertArrayEquals(blob2.getBytes(1, (int)blob2.length()), new 
byte[]{1, 2, 3});
+
+        blob1.setBytes(4, new byte[]{4});
+        blob2.truncate(2);
+
+        Assert.assertArrayEquals(blob1.getBytes(1, (int)blob1.length()), new 
byte[]{1, 2, 3, 4});
+        Assert.assertArrayEquals(blob2.getBytes(1, (int)blob2.length()), new 
byte[]{1, 2});
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlobNull() throws Exception {
+        ResultSet rs = stmt.executeQuery("select blobVal from testObject where 
id = 2");
+
+        assertTrue(rs.next());
+
+        Blob blob = rs.getBlob("blobVal");
+        Assert.assertNull(blob);
+
+        blob = rs.getBlob(1);
+        Assert.assertNull(blob);
 
         assertFalse(rs.next());
     }
@@ -637,6 +690,109 @@ public class JdbcThinResultSetSelfTest extends 
JdbcThinAbstractSelfTest {
         assertFalse(rs.next());
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStream() throws Exception {
+        ResultSet rs = stmt.executeQuery(SQL);
+
+        assertTrue(rs.next());
+
+        InputStream is = rs.getBinaryStream("blobVal");
+
+        assertEquals(1, is.read());
+
+        assertTrue(is.markSupported());
+        is.mark(100);
+
+        byte[] res = new byte[]{33, 33, 33};
+        assertEquals(1, is.read(res, 1, 1));
+        Assert.assertArrayEquals(res, new byte[] {33, 2, 33});
+
+        is.reset();
+
+        assertEquals(2, is.read(res));
+        Assert.assertArrayEquals(res, new byte[] {2, 3, 33});
+
+        is.reset();
+
+        assertEquals(0, is.skip(-1));
+        assertEquals(0, is.skip(0));
+        assertEquals(1, is.skip(1));
+        assertEquals(3, is.read());
+
+        assertEquals(-1, is.read());
+        assertEquals(-1, is.read(res));
+
+        assertFalse(rs.next());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStreamInstancesAreIndependent() throws Exception {
+        ResultSet rs = stmt.executeQuery(SQL);
+
+        assertTrue(rs.next());
+
+        InputStream is1 = rs.getBinaryStream(16);
+        InputStream is2 = rs.getBinaryStream(16);
+
+        assertEquals(1, is1.read());
+        assertEquals(1, is1.skip(1));
+
+        assertEquals(1, is2.read());
+        is2.mark(100);
+        assertEquals(2, is2.read());
+        assertEquals(3, is2.read());
+
+        is2.reset();
+
+        assertEquals(3, is1.read());
+
+        assertEquals(2, is2.read());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStreamForNonBinaryType() throws Exception {
+        byte[] expected = "1".getBytes();
+
+        ResultSet rs = stmt.executeQuery(SQL);
+
+        assertTrue(rs.next());
+
+        InputStream asStream = rs.getBinaryStream("strVal");
+        byte[] asBytes = rs.getBytes("strVal");
+
+        Assert.assertArrayEquals(expected, asStream.readAllBytes());
+        Assert.assertArrayEquals(expected, asBytes);
+
+        assertFalse(rs.next());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryStreamNull() throws Exception {
+        ResultSet rs = stmt.executeQuery("select blobVal from testObject where 
id = 2");
+
+        assertTrue(rs.next());
+
+        InputStream stream = rs.getBinaryStream("blobVal");
+        Assert.assertNull(stream);
+
+        stream = rs.getBinaryStream(1);
+        Assert.assertNull(stream);
+
+        assertFalse(rs.next());
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -753,6 +909,38 @@ public class JdbcThinResultSetSelfTest extends 
JdbcThinAbstractSelfTest {
         Assert.assertEquals("Result count mismatch", 1, cnt);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testObjectForBinaryType() throws Exception {
+        ResultSet rs = stmt.executeQuery(SQL);
+
+        assertTrue(rs.next());
+
+        Object res = rs.getObject("blobVal");
+        assertTrue(res instanceof byte[]);
+
+        res = rs.getObject(16);
+        assertTrue(res instanceof byte[]);
+
+        res = rs.getObject("blobVal", byte[].class);
+        assertTrue(res instanceof byte[]);
+
+        res = rs.getObject(16, byte[].class);
+        assertTrue(res instanceof byte[]);
+
+        res = rs.getObject("blobVal", Blob.class);
+        assertTrue(res instanceof Blob);
+
+        res = rs.getObject(16, Blob.class);
+        assertTrue(res instanceof Blob);
+
+        assertThrows(null, () -> rs.getObject("strVal", Blob.class),
+                SQLException.class,
+                "Cannot convert to Blob [colIdx=10]");
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -854,18 +1042,6 @@ public class JdbcThinResultSetSelfTest extends 
JdbcThinAbstractSelfTest {
             }
         });
 
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                rs.getBinaryStream(1);
-            }
-        });
-
-        checkNotSupported(new RunnableX() {
-            @Override public void runx() throws Exception {
-                rs.getBinaryStream("id");
-            }
-        });
-
         checkNotSupported(new RunnableX() {
             @Override public void runx() throws Exception {
                 rs.getCharacterStream(1);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index a28c0682474..826bc7df9e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.binary;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectOutput;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
 
 /**
  * Binary writer implementation.
@@ -53,6 +55,9 @@ public class BinaryWriterExImpl implements BinaryWriter, 
BinaryRawWriterEx, Obje
     /** Initial capacity. */
     private static final int INIT_CAP = 1024;
 
+    /** Default buffer size for reading from streams. */
+    public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
+
     /** */
     private final BinaryContext ctx;
 
@@ -1880,6 +1885,65 @@ public class BinaryWriterExImpl implements BinaryWriter, 
BinaryRawWriterEx, Obje
         fieldCnt++;
     }
 
+    /**
+     * Write byte array from the InputStream.
+     *
+     * <p>If {@code limit} > 0 than no more than {@code limit} bytes will be 
read and written.
+     * If {@code limit} == -1 than it will try to read and write all bytes.
+     *
+     * <p>In any case if actual number of bytes is greater than {@code 
MAX_ARRAY_SIZE}
+     * than exception will be thrown.
+     *
+     * @param in InputStream.
+     * @param limit Max length of data to be read from the stream or -1 if all 
data should be read.
+     * @return Number of bytes written.
+     * @throws BinaryObjectException If an I/O error occurs or stream contains 
more than {@code MAX_ARRAY_SIZE} bytes.
+     */
+    public int writeByteArray(InputStream in, int limit) throws 
BinaryObjectException {
+        if (limit != -1)
+            out.unsafeEnsure(1 + 4 + limit);
+        else
+            out.unsafeEnsure(1 + 4);
+
+        out.unsafeWriteByte(GridBinaryMarshaller.BYTE_ARR);
+
+        int lengthPos = out.position();
+
+        out.position(lengthPos + 4);
+
+        int written = 0;
+
+        byte[] buf = new byte[limit > 0 ? Math.min(limit, DEFAULT_BUFFER_SIZE) 
: DEFAULT_BUFFER_SIZE];
+
+        while (limit == -1 || written < limit) {
+            int read;
+            try {
+                read = limit > 0
+                        ? in.read(buf, 0, Math.min(buf.length, limit - 
written))
+                        : in.read(buf, 0, buf.length);
+            }
+            catch (IOException e) {
+                throw new BinaryObjectException(e);
+            }
+
+            if (read == -1)
+                break;
+
+            if (read + written > MAX_ARRAY_SIZE)
+                throw new BinaryObjectException("Too much data. Can't write 
more then " + MAX_ARRAY_SIZE + " bytes from stream.");
+
+            out.writeByteArray(buf, 0, read);
+
+            written += read;
+        }
+
+        out.position(lengthPos);
+        out.unsafeWriteInt(written);
+        out.position(out.position() + written);
+
+        return written;
+    }
+
     /**
      * @param schemaId Schema ID.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
index 165bb8801e2..3beb93844cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
@@ -36,7 +36,7 @@ public abstract class BinaryAbstractOutputStream extends 
BinaryAbstractStream
      * OutOfMemoryError: Requested array size exceeds VM limit
      * @see java.util.ArrayList#MAX_ARRAY_SIZE
      */
-    protected static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+    public static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
     /** {@inheritDoc} */
     @Override public void writeByte(byte val) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 5f68115abba..0de9affbc1f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -876,7 +876,7 @@ public class JdbcThinConnection implements Connection {
     @Override public Blob createBlob() throws SQLException {
         ensureNotClosed();
 
-        return new JdbcBlob(new byte[0]);
+        return new JdbcBlob();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index d64aaab423e..0aa55e6c824 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -39,6 +39,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
+import org.apache.ignite.internal.processors.odbc.SqlInputStreamWrapper;
 import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsRequest;
@@ -46,6 +47,9 @@ import 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 
+import static java.sql.Types.BINARY;
+import static 
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+
 /**
  * JDBC prepared statement implementation.
  */
@@ -213,9 +217,7 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
 
     /** {@inheritDoc} */
     @Override public void setBinaryStream(int paramIdx, InputStream x, int 
length) throws SQLException {
-        ensureNotClosed();
-
-        throw new SQLFeatureNotSupportedException("Streams are not 
supported.");
+        setBinaryStream(paramIdx, x, (long)length);
     }
 
     /** {@inheritDoc} */
@@ -305,7 +307,7 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
 
     /** {@inheritDoc} */
     @Override public void setBlob(int paramIdx, Blob x) throws SQLException {
-        setBytes(paramIdx, x.getBytes(1, (int)x.length()));
+        setArgument(paramIdx, x);
     }
 
     /** {@inheritDoc} */
@@ -411,9 +413,7 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
 
     /** {@inheritDoc} */
     @Override public void setBlob(int paramIdx, InputStream inputStream, long 
length) throws SQLException {
-        ensureNotClosed();
-
-        throw new SQLFeatureNotSupportedException("SQL-specific types are not 
supported.");
+        setBinaryStream(paramIdx, inputStream, length);
     }
 
     /** {@inheritDoc} */
@@ -446,7 +446,18 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
     @Override public void setBinaryStream(int paramIdx, InputStream x, long 
length) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Streams are not 
supported.");
+        if (length < 0)
+            throw new SQLException("Invalid argument. Length should be greater 
than 0.");
+
+        if (length > MAX_ARRAY_SIZE) {
+            throw new SQLFeatureNotSupportedException("Invalid argument. 
InputStreams with length greater than " + MAX_ARRAY_SIZE +
+                    " are not supported.");
+        }
+
+        if (x == null)
+            setNull(paramIdx, BINARY);
+        else
+            setArgument(paramIdx, new SqlInputStreamWrapper(x, (int)length));
     }
 
     /** {@inheritDoc} */
@@ -467,7 +478,10 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
     @Override public void setBinaryStream(int paramIdx, InputStream x) throws 
SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Streams are not 
supported.");
+        if (x == null)
+            setNull(paramIdx, BINARY);
+        else
+            setArgument(paramIdx, new SqlInputStreamWrapper(x));
     }
 
     /** {@inheritDoc} */
@@ -493,9 +507,7 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
 
     /** {@inheritDoc} */
     @Override public void setBlob(int paramIdx, InputStream inputStream) 
throws SQLException {
-        ensureNotClosed();
-
-        throw new SQLFeatureNotSupportedException("SQL-specific types are not 
supported.");
+        setBinaryStream(paramIdx, inputStream);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
index 1ba7affc761..58cb64c04e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.jdbc.thin;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.Reader;
 import java.math.BigDecimal;
@@ -46,6 +47,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.internal.jdbc2.JdbcBinaryBuffer;
 import org.apache.ignite.internal.jdbc2.JdbcBlob;
 import org.apache.ignite.internal.jdbc2.JdbcClob;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
@@ -472,6 +474,8 @@ public class JdbcThinResultSet implements ResultSet {
 
         if (cls == byte[].class)
             return (byte[])val;
+        else if (cls == JdbcBinaryBuffer.class)
+            return ((JdbcBinaryBuffer)val).bytes();
         else if (cls == Byte.class)
             return new byte[] {(byte)val};
         else if (cls == Short.class) {
@@ -587,9 +591,15 @@ public class JdbcThinResultSet implements ResultSet {
 
     /** {@inheritDoc} */
     @Override public InputStream getBinaryStream(int colIdx) throws 
SQLException {
-        ensureNotClosed();
+        Object val = getValue(colIdx);
+
+        if (val == null)
+            return null;
 
-        throw new SQLFeatureNotSupportedException("Stream are not supported.");
+        if (val instanceof JdbcBinaryBuffer)
+            return ((JdbcBinaryBuffer)val).inputStream();
+        else
+            return new ByteArrayInputStream(getBytes(colIdx));
     }
 
     /** {@inheritDoc} */
@@ -699,9 +709,7 @@ public class JdbcThinResultSet implements ResultSet {
 
     /** {@inheritDoc} */
     @Override public InputStream getBinaryStream(String colLb) throws 
SQLException {
-        ensureNotClosed();
-
-        throw new SQLFeatureNotSupportedException("Streams are not 
supported.");
+        return getBinaryStream(findColumn(colLb));
     }
 
     /** {@inheritDoc} */
@@ -738,14 +746,19 @@ public class JdbcThinResultSet implements ResultSet {
 
     /** {@inheritDoc} */
     @Override public Object getObject(int colIdx) throws SQLException {
-        return getValue(colIdx);
+        Object val = getValue(colIdx);
+
+        if (val instanceof JdbcBinaryBuffer)
+            return ((JdbcBinaryBuffer)val).bytes();
+        else
+            return val;
     }
 
     /** {@inheritDoc} */
     @Override public Object getObject(String colLb) throws SQLException {
         int colIdx = findColumn(colLb);
 
-        return getValue(colIdx);
+        return getObject(colIdx);
     }
 
     /** {@inheritDoc} */
@@ -1298,12 +1311,22 @@ public class JdbcThinResultSet implements ResultSet {
 
     /** {@inheritDoc} */
     @Override public Blob getBlob(int colIdx) throws SQLException {
-        return new JdbcBlob(getBytes(colIdx));
+        Object val = getValue(colIdx);
+
+        if (val == null)
+            return null;
+
+        if (!(val instanceof JdbcBinaryBuffer))
+            throw new SQLException("Cannot convert to Blob [colIdx=" + colIdx 
+ "]");
+
+        return new JdbcBlob(((JdbcBinaryBuffer)val).shallowCopy());
     }
 
     /** {@inheritDoc} */
     @Override public Clob getClob(int colIdx) throws SQLException {
-        return new JdbcClob(getString(colIdx));
+        String str = getString(colIdx);
+
+        return str != null ? new JdbcClob(str) : null;
     }
 
     /** {@inheritDoc} */
@@ -1327,12 +1350,14 @@ public class JdbcThinResultSet implements ResultSet {
 
     /** {@inheritDoc} */
     @Override public Blob getBlob(String colLb) throws SQLException {
-        return new JdbcBlob(getBytes(colLb));
+        return getBlob(findColumn(colLb));
     }
 
     /** {@inheritDoc} */
     @Override public Clob getClob(String colLb) throws SQLException {
-        return new JdbcClob(getString(colLb));
+        String str = getString(colLb);
+
+        return str != null ? new JdbcClob(str) : null;
     }
 
     /** {@inheritDoc} */
@@ -1813,6 +1838,8 @@ public class JdbcThinResultSet implements ResultSet {
             return getTimestamp(colIdx);
         else if (targetCls == byte[].class)
             return getBytes(colIdx);
+        else if (targetCls == Blob.class)
+            return getBlob(colIdx);
         else if (targetCls == URL.class)
             return getURL(colIdx);
         else {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBuffer.java
new file mode 100644
index 00000000000..d9a57d47cb0
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBuffer.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Objects;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+
+/**
+ * Buffer storing the binary data.
+ *
+ * <p>Buffer can start working in read-only mode if created wrapping the 
existing byte array which
+ * can not be modified. Any write operation switches it lazily to the 
read-write mode. This allows
+ * to prevent the unnecessary data copying.
+ *
+ * <p>Data may be read via the InputStream API and modified via the 
OutputStream one. Changes done via
+ * OutputStream are visible via the InputStream even if InputStream is created 
before changes done.
+ *
+ * <p>InputStream and OutputStream created remain valid even if the underlying 
data storage changed from
+ * read-only to read-write.
+ *
+ * <p>Note however that implementation is not thread-safe.
+ */
+public class JdbcBinaryBuffer {
+    /** Byte array storing data. */
+    private byte[] arr;
+
+    /** Offset the data starts in the array. */
+    private int off;
+
+    /** The length of data. */
+    private int len;
+
+    /** Read only flag. */
+    private boolean isReadOnly;
+
+    /** Minimum buffer capacity. */
+    public static final int MIN_CAP = 256;
+
+    /**
+     * Create buffer which wraps the existing byte array and start working in 
the read-only mode.
+     *
+     * @param arr The byte array to be wrapped.
+     * @param off The offset to the first byte to be wrapped.
+     * @param len The length in bytes of the data to be wrapped.
+     */
+    public static JdbcBinaryBuffer createReadOnly(byte[] arr, int off, int 
len) {
+        return new JdbcBinaryBuffer(arr, off, len, true);
+    }
+
+    /**
+     * Create buffer which takes ownerhip of and wraps data in the existing 
byte array and
+     * starts working in the read-write mode.
+     *
+     * @param arr The byte array to be wrapped.
+     */
+    public static JdbcBinaryBuffer createReadWrite(byte[] arr) {
+        return new JdbcBinaryBuffer(arr, 0, arr.length, false);
+    }
+
+    /**
+     * Create empty buffer which starts working in the read-write mode.
+     */
+    public static JdbcBinaryBuffer createReadWrite() {
+        return new JdbcBinaryBuffer(new byte[MIN_CAP], 0, 0, false);
+    }
+
+    /**
+     * Private constructor.
+     *
+     * @param arr The byte array to be wrapped.
+     * @param off The offset to the first byte to be wrapped.
+     * @param len The length in bytes of the data to be wrapped.
+     * @param isReadOnly The read-only flag.
+     */
+    private JdbcBinaryBuffer(byte[] arr, int off, int len, boolean isReadOnly) 
{
+        this.arr = arr;
+        this.off = off;
+        this.len = len;
+        this.isReadOnly = isReadOnly;
+    }
+
+    /**
+     * Create shallow read-only copy of this buffer.
+     */
+    public JdbcBinaryBuffer shallowCopy() {
+        return new JdbcBinaryBuffer(arr, off, len, true);
+    }
+
+    /**
+     * Provide InputStream through which the data can be read starting from the
+     * begining.
+     *
+     * <p>Stream is not limited meaning that it would return any new data
+     * written to the buffer after stream creation.
+     *
+     * @return InputStream instance.
+     */
+    public InputStream inputStream() {
+        return new BufferInputStream();
+    }
+
+    /**
+     * Get copy of the buffer data as byte array.
+     *
+     * @return Byte array containing buffer data.
+     */
+    public byte[] bytes() {
+        byte[] bytes = new byte[len];
+
+        read(0, bytes, 0, len);
+
+        return bytes;
+    }
+
+    /**
+     * Provide OutputStream through which the data can be written to buffer 
starting from
+     * the (zero-based) {@code pos} position.
+     *
+     * @param pos The zero-based offset to the first byte to be written. Must 
not be negative
+     *            or greater than total count of bytes in buffer.
+     *
+     * @return OutputStream instance.
+     */
+    OutputStream outputStream(int pos) {
+        return new BufferOutputStream(pos);
+    }
+
+    /**
+     * Provide InputStream through which the no more than {@code len} bytes 
can be read
+     * from buffer starting from the specified zero-based position {@code pos}.
+     *
+     * @param pos The zero-based offset to the first byte to be retrieved. 
Must not be negative
+     *            or greater than total count of bytes in buffer.
+     * @param len The length in bytes of the data to be retrieved. Must not be 
negative.
+     * @return InputStream instance.
+     */
+    InputStream inputStream(int pos, int len) {
+        return new BufferInputStream(pos, pos + len);
+    }
+
+    /**
+     * Truncate data in this buffer to specified length.
+     *
+     * @param len New length.
+     */
+    void truncate(int len) {
+        this.len = len;
+
+        reallocate(Math.max(MIN_CAP, len));
+    }
+
+    /**
+     * @return Length of data in this buffer.
+     */
+    int length() {
+        return len;
+    }
+
+    /**
+     * Read up to {@code resLen} bytes from this buffer from specified 
position {@code pos}.
+     *
+     * @param pos Pointer to a position.
+     * @param resBuf Output byte array to write to.
+     * @param resOff Offset in the output array to start write to.
+     * @param resLen Number of bytes to read.
+     * @return Number of bytes read. -1 if end of data reached.
+     */
+    int read(int pos, byte[] resBuf, int resOff, int resLen) {
+        Objects.checkFromIndexSize(resOff, resLen, resBuf.length);
+
+        if (pos >= len)
+            return -1;
+
+        int bufOff = pos + off;
+
+        int size = Math.min(resLen, len - pos);
+
+        U.arrayCopy(arr, bufOff, resBuf, resOff, size);
+
+        return size;
+    }
+
+    /**
+     * Writes {@code inpLen} bytes from the specified byte array {@code bytes} 
starting at offset {@code off}
+     * to this storage to specified position {@code pos}.
+     *
+     * @param pos Pointer to a position.
+     * @param inpBuf Input byte array.
+     * @param inpOff Start offset in the input array.
+     * @param inpLen Number of bytes to write.
+     */
+    void write(int pos, byte[] inpBuf, int inpOff, int inpLen) throws 
IOException {
+        if (MAX_ARRAY_SIZE - pos < inpLen)
+            throw new IOException("Too much data. Can't write more then " + 
MAX_ARRAY_SIZE + " bytes.");
+
+        Objects.checkFromIndexSize(inpOff, inpLen, inpBuf.length);
+
+        updateLength(Math.max(pos + inpLen, len));
+
+        U.arrayCopy(inpBuf, inpOff, arr, pos, inpLen);
+    }
+
+    /**
+     * Read a byte from this buffer from specified position {@code pos}.
+     *
+     * @param pos Position.
+     * @return Byte read from the Blob. -1 if end of data reached.
+     */
+    int read(int pos) {
+        if (pos >= len)
+            return -1;
+
+        return arr[pos + off] & 0xff;
+    }
+
+    /**
+     * Write a byte to this buffer to specified position {@code pos}.
+     *
+     * <p>The byte to be written is the eight low-order bits of the
+     * argument {@code b}. The 24 high-order bits of {@code b}b are ignored.
+     *
+     * @param pos Pointer to a position.
+     * @param b Byte to write.
+     */
+    void write(int pos, int b) throws IOException {
+        if (MAX_ARRAY_SIZE - pos < 1)
+            throw new IOException("Too much data. Can't write more then " + 
MAX_ARRAY_SIZE + " bytes.");
+
+        updateLength(Math.max(pos + 1, len));
+
+        arr[pos] = (byte)b;
+    }
+
+    /**
+     * Update data length. Reallocate underlining array if needed.
+     *
+     * @param newLen The new data length the buffer should be able to hold.
+     */
+    private void updateLength(int newLen) {
+        if (newLen > arr.length || isReadOnly)
+            reallocate(capacity(arr.length, newLen));
+
+        len = newLen;
+    }
+
+    /**
+     * Calculate new capacity.
+     *
+     * @param cap Current capacity.
+     * @param reqCap Required new capacity.
+     * @return New capacity.
+     */
+    protected static int capacity(int cap, int reqCap) {
+        if (reqCap <= MIN_CAP)
+            return MIN_CAP;
+
+        int resCap = Math.max(cap, MIN_CAP);
+
+        while (resCap < reqCap) {
+            resCap <<= 1;
+
+            if (resCap < 0)
+                return MAX_ARRAY_SIZE;
+        }
+
+        return resCap;
+    }
+
+    /**
+     * Allocate the new underlining array and copy data.
+     *
+     * @param newCapacity New capacity.
+     */
+    private void reallocate(int newCapacity) {
+        byte[] newBuf = new byte[newCapacity];
+
+        U.arrayCopy(arr, off, newBuf, 0, len);
+
+        arr = newBuf;
+        off = 0;
+        isReadOnly = false;
+    }
+
+    /**
+     * Input stream to read data from buffer.
+     */
+    private class BufferInputStream extends InputStream {
+        /** Max position in the buffer. -1 means no max position (for 
unlimited stream). */
+        private final int limit;
+
+        /** Current position in the buffer. */
+        private int pos;
+
+        /** Remembered buffer position at the moment the {@link 
InputStream#mark} is called. */
+        private int markedPos;
+
+        /**
+         * Create unlimited stream to read all data from the buffer starting 
from the beginning.
+         */
+        private BufferInputStream() {
+            this(0, -1);
+        }
+
+        /**
+         * Create stream to read data from the buffer starting from the 
specified {@code start}
+         * zero-based position.
+         *
+         * @param start The zero-based offset to the first byte to be 
retrieved.
+         * @param limit The maximim length in bytes of the data to be 
retrieved. Unlimited if null.
+         */
+        private BufferInputStream(int start, int limit) {
+            pos = start;
+
+            markedPos = start;
+
+            this.limit = limit;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read() {
+            if (limit != -1 && pos >= limit)
+                return -1;
+
+            int res = JdbcBinaryBuffer.this.read(pos);
+
+            if (res != -1)
+                pos++;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(byte[] res, int off, int cnt) {
+            Objects.checkFromIndexSize(off, cnt, res.length);
+
+            int toRead = cnt;
+
+            if (limit != -1) {
+                if (pos >= limit)
+                    return -1;
+
+                toRead = Math.min(limit - pos, cnt);
+            }
+
+            int read = JdbcBinaryBuffer.this.read(pos, res, off, toRead);
+
+            if (read != -1)
+                pos += read;
+
+            return read;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean markSupported() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void reset() {
+            pos = markedPos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void mark(int readlimit) {
+            markedPos = pos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long skip(long n) {
+            if (n <= 0)
+                return 0;
+
+            int step = Math.min((int)Math.min(n, MAX_ARRAY_SIZE),
+                    limit == -1 ? len - pos : limit - pos);
+
+            pos += step;
+
+            return step;
+        }
+    }
+
+    /**
+     * Output stream to write data to buffer.
+     */
+    private class BufferOutputStream extends OutputStream {
+        /** Current position in the buffer. */
+        private int pos;
+
+        /**
+         * Create stream to write data to the buffer starting from the 
specified position {@code pos}.
+         *
+         * @param pos Starting position (zero-based).
+         */
+        private BufferOutputStream(int pos) {
+            this.pos = pos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(int b) throws IOException {
+            if (pos > length()) {
+                throw new IOException("Writting beyond end of Blob, it 
probably was truncated after OutputStream was created " +
+                        "[pos=" + pos + ", blobLength=" + length() + "]");
+            }
+
+            JdbcBinaryBuffer.this.write(pos++, b);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(byte[] b, int off, int len) throws 
IOException {
+            if (pos > length()) {
+                throw new IOException("Writting beyond end of Blob, it 
probably was truncated after OutputStream was created " +
+                        "[pos=" + pos + ", blobLength=" + length() + "]");
+            }
+
+            JdbcBinaryBuffer.this.write(pos, b, off, len);
+
+            pos += len;
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
index ef2b6b5bd3a..682c0212a20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
@@ -18,54 +18,79 @@
 package org.apache.ignite.internal.jdbc2;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.Blob;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.Arrays;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
- * Simple BLOB implementation. Actually there is no such entity as BLOB in 
Ignite. So using arrays is preferable way
+ * Simple BLOB implementation. Actually there is no such entity as BLOB in 
Ignite. So using arrays is a preferable way
  * to work with binary objects.
  *
- * This implementation can be useful for reading binary fields of objects 
through JDBC.
+ * <p>This implementation can be useful for writting and reading binary fields 
of objects through JDBC.
  */
 public class JdbcBlob implements Blob {
-    /** Byte array. */
-    private byte[] arr;
+    /** Buffer to store actial data. */
+    private JdbcBinaryBuffer buf;
 
     /**
+     * Create empty Blob.
+     *
+     * <p>It's supposed to be called when client application creates Blob 
calling the
+     * {@link java.sql.Connection#createBlob}.
+     */
+    public JdbcBlob() {
+        buf = JdbcBinaryBuffer.createReadWrite();
+    }
+
+    /**
+     * Create Blob which wraps the existing data stored in the buffer.
+     *
+     * <p>It's supposed to be called to create Blob for query result in the 
{@link java.sql.ResultSet}.
+     *
+     * @param buf Existing buffer with data.
+     */
+    public JdbcBlob(JdbcBinaryBuffer buf) {
+        this.buf = buf;
+    }
+
+    /**
+     * Create Blob which wraps the existing byte array.
+     *
      * @param arr Byte array.
      */
     public JdbcBlob(byte[] arr) {
-        this.arr = arr;
+        this(JdbcBinaryBuffer.createReadWrite(arr));
     }
 
     /** {@inheritDoc} */
     @Override public long length() throws SQLException {
         ensureNotClosed();
 
-        return arr.length;
+        return buf.length();
     }
 
     /** {@inheritDoc} */
     @Override public byte[] getBytes(long pos, int len) throws SQLException {
         ensureNotClosed();
 
-        if (pos < 1 || (arr.length - pos < 0 && arr.length > 0) || len < 0)
+        if (pos < 1 || (buf.length() - pos < 0 && buf.length() > 0) || len < 
0) {
             throw new SQLException("Invalid argument. Position can't be less 
than 1 or " +
-                "greater than size of underlying byte array. Requested length 
also can't be negative " + "" +
-                "[pos=" + pos + ", len=" + len + ']');
+                "greater than Blob length. Requested length also can't be 
negative " +
+                "[pos=" + pos + ", len=" + len + ", blobLen=" + buf.length() + 
"]");
+        }
 
         int idx = (int)(pos - 1);
 
-        int size = len > arr.length - idx ? arr.length - idx : len;
+        int size = Math.min(len, buf.length() - idx);
 
         byte[] res = new byte[size];
 
-        U.arrayCopy(arr, idx, res, 0, size);
+        if (size == 0)
+            return res;
+
+        buf.read(idx, res, 0, size);
 
         return res;
     }
@@ -74,55 +99,46 @@ public class JdbcBlob implements Blob {
     @Override public InputStream getBinaryStream() throws SQLException {
         ensureNotClosed();
 
-        return new ByteArrayInputStream(arr);
+        return buf.inputStream();
     }
 
     /** {@inheritDoc} */
     @Override public InputStream getBinaryStream(long pos, long len) throws 
SQLException {
         ensureNotClosed();
 
-        if (pos < 1 || len < 1 || pos > arr.length || len > arr.length - pos + 
1)
+        if (pos < 1 || len < 1 || pos > buf.length() || len > buf.length() - 
pos + 1) {
             throw new SQLException("Invalid argument. Position can't be less 
than 1 or " +
-                "greater than size of underlying byte array. Requested length 
can't be negative and can't be " +
-                "greater than available bytes from given position [pos=" + pos 
+ ", len=" + len + ']');
+                "greater than Blob length. Requested length can't be negative 
and can't be " +
+                "greater than available bytes from given position [pos=" + pos 
+ ", len=" + len + ", blobLen=" + buf.length() + "]");
+        }
 
-        return new ByteArrayInputStream(arr, (int)(pos - 1), (int)len);
+        return buf.inputStream((int)pos - 1, (int)len);
     }
 
     /** {@inheritDoc} */
     @Override public long position(byte[] ptrn, long start) throws 
SQLException {
         ensureNotClosed();
 
-        if (start < 1 || start > arr.length || ptrn.length == 0 || ptrn.length 
> arr.length)
-            return -1;
-
-        for (int i = 0, pos = (int)(start - 1); pos < arr.length;) {
-            if (arr[pos] == ptrn[i]) {
-                pos++;
-
-                i++;
-
-                if (i == ptrn.length)
-                    return pos - ptrn.length + 1;
-            }
-            else {
-                pos = pos - i + 1;
+        if (start < 1)
+            throw new SQLException("Invalid argument. Start position can't be 
less than 1 [start=" + start + "]");
 
-                i = 0;
-            }
-        }
+        if (start > buf.length() || ptrn.length == 0 || ptrn.length > 
buf.length())
+            return -1;
 
-        return -1;
+        return position(new ByteArrayInputStream(ptrn), ptrn.length, 
(int)start);
     }
 
     /** {@inheritDoc} */
     @Override public long position(Blob ptrn, long start) throws SQLException {
         ensureNotClosed();
 
-        if (start < 1 || start > arr.length || ptrn.length() == 0 || 
ptrn.length() > arr.length)
+        if (start < 1)
+            throw new SQLException("Invalid argument. Start position can't be 
less than 1 [start=" + start + "]");
+
+        if (start > buf.length() || ptrn.length() == 0 || ptrn.length() > 
buf.length())
             return -1;
 
-        return position(ptrn.getBytes(1, (int)ptrn.length()), start);
+        return position(ptrn.getBinaryStream(), (int)ptrn.length(), 
(int)start);
     }
 
     /** {@inheritDoc} */
@@ -134,57 +150,108 @@ public class JdbcBlob implements Blob {
     @Override public int setBytes(long pos, byte[] bytes, int off, int len) 
throws SQLException {
         ensureNotClosed();
 
-        if (pos < 1)
-            throw new SQLException("Invalid argument. Position can't be less 
than 1 [pos=" + pos + ']');
-
-        int idx = (int)(pos - 1);
-
-        if (pos - 1 > arr.length || off < 0 || off >= bytes.length || off + 
len > bytes.length)
-            throw new ArrayIndexOutOfBoundsException();
-
-        byte[] dst = arr;
-
-        if (idx + len > arr.length) {
-            dst = new byte[arr.length + (len - (arr.length - idx))];
-
-            U.arrayCopy(arr, 0, dst, 0, idx);
-
-            arr = dst;
+        if (pos < 1 || pos - 1 > buf.length()) {
+            throw new SQLException("Invalid argument. Position can't be less 
than 1 or " +
+                "greater than Blob length + 1 [pos=" + pos + ", blobLen=" + 
buf.length() + "]");
         }
 
-        U.arrayCopy(bytes, off, dst, idx, len);
+        try {
+            buf.write((int)pos - 1, bytes, off, len);
+        }
+        catch (IOException e) {
+            throw new SQLException(e);
+        }
 
         return len;
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream setBinaryStream(long pos) throws 
SQLException {
-        throw new SQLFeatureNotSupportedException();
+        ensureNotClosed();
+
+        if (pos < 1 || pos - 1 > buf.length()) {
+            throw new SQLException("Invalid argument. Position can't be less 
than 1 or greater than Blob length + 1 " +
+                    "[pos=" + pos + ", blobLen=" + buf.length() + "]");
+        }
+
+        return buf.outputStream((int)pos - 1);
     }
 
     /** {@inheritDoc} */
     @Override public void truncate(long len) throws SQLException {
         ensureNotClosed();
 
-        if (len < 0 || len > arr.length)
+        if (len < 0 || len > buf.length()) {
             throw new SQLException("Invalid argument. Length can't be " +
-                "less than zero or greater than Blob length [len=" + len + 
']');
-
-        arr = Arrays.copyOf(arr, (int)len);
+                "less than zero or greater than Blob length [len=" + len + ", 
blobLen=" + buf.length() + "]");
+        }
 
+        buf.truncate((int)len);
     }
 
     /** {@inheritDoc} */
     @Override public void free() throws SQLException {
-        if (arr != null)
-            arr = null;
+        if (buf != null)
+            buf = null;
+    }
+
+    /**
+     * Actial implementation of the pattern search.
+     *
+     * @param ptrn InputStream containing the pattern.
+     * @param ptrnLen Pattern length.
+     * @param idx 1-based index in Blob to start search from.
+     * @return 1-based position at which the pattern appears, else -1.
+     */
+    private long position(InputStream ptrn, int ptrnLen, int idx) throws 
SQLException {
+        try (InputStream blob = buf.inputStream(idx - 1, buf.length() - idx + 
1)) {
+            boolean patternStarted = false;
+
+            int ptrnPos = 0;
+            int blobPos = idx - 1;
+            int b;
+
+            while ((b = blob.read()) != -1) {
+                if (b == ptrn.read()) {
+                    if (!patternStarted) {
+                        patternStarted = true;
+
+                        blob.mark(Integer.MAX_VALUE);
+                    }
+
+                    blobPos++;
+
+                    ptrnPos++;
+
+                    if (ptrnPos == ptrnLen)
+                        return blobPos - ptrnLen + 1;
+                }
+                else {
+                    blobPos = blobPos - ptrnPos + 1;
+
+                    ptrnPos = 0;
+                    ptrn.reset();
+
+                    if (patternStarted) {
+                        patternStarted = false;
+
+                        blob.reset();
+                    }
+                }
+            }
+
+            return -1;
+        }
+        catch (IOException e) {
+            throw new SQLException(e);
+        }
     }
 
     /**
      *
      */
     private void ensureNotClosed() throws SQLException {
-        if (arr == null)
+        if (buf == null)
             throw new SQLException("Blob instance can't be used after free() 
has been called.");
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlInputStreamWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlInputStreamWrapper.java
new file mode 100644
index 00000000000..0cb58237633
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlInputStreamWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.InputStream;
+
+/**
+ * InputStream wrapper used to pass it as argument to PreparedStatement.
+ */
+public class SqlInputStreamWrapper {
+    /** Input stream wrapped. */
+    private final InputStream stream;
+
+    /** Length of data in the input stream. -1 if unknown. */
+    private final int len;
+
+    /**
+     * @param stream Input stream.
+     * @param len Length of data in the input stream. -1 if unknown.
+     */
+    public SqlInputStreamWrapper(InputStream stream, int len) {
+        this.stream = stream;
+        this.len = len;
+    }
+
+    /**
+     * @param stream Input stream.
+     */
+    public SqlInputStreamWrapper(InputStream stream) {
+        this(stream, -1);
+    }
+
+    /**
+     * Returns input stream for the enclosed data.
+     *
+     * @return Input stream.
+     */
+    public InputStream inputStream() {
+        return stream;
+    }
+
+    /**
+     * @return Length of data in the input stream.
+     */
+    public int length() {
+        return len;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
index de53c276ac5..be868c6ad20 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.odbc;
 
 import java.math.BigDecimal;
+import java.sql.Blob;
+import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.UUID;
@@ -28,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.jdbc2.JdbcBinaryBuffer;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.util.typedef.F;
@@ -70,7 +73,21 @@ public abstract class SqlListenerUtils {
      * @throws BinaryObjectException On error.
      */
     @Nullable public static Object readObject(byte type, BinaryReaderExImpl 
reader, boolean binObjAllow,
-        boolean keepBinary) throws BinaryObjectException {
+                                              boolean keepBinary) throws 
BinaryObjectException {
+        return readObject(type, reader, binObjAllow, keepBinary, true);
+    }
+
+    /**
+     * @param type Object type.
+     * @param reader Reader.
+     * @param binObjAllow Allow to read non plaint objects.
+     * @param keepBinary Whether to deserialize objects or keep in binary 
format.
+     * @param createByteArrayCopy Whether to return new copy or copy-on-write 
buffer for byte array.
+     * @return Read object.
+     * @throws BinaryObjectException On error.
+     */
+    @Nullable public static Object readObject(byte type, BinaryReaderExImpl 
reader, boolean binObjAllow,
+        boolean keepBinary, boolean createByteArrayCopy) throws 
BinaryObjectException {
         switch (type) {
             case GridBinaryMarshaller.NULL:
                 return null;
@@ -121,7 +138,7 @@ public abstract class SqlListenerUtils {
                 return BinaryUtils.doReadBooleanArray(reader.in());
 
             case GridBinaryMarshaller.BYTE_ARR:
-                return BinaryUtils.doReadByteArray(reader.in());
+                return readByteArray(reader, createByteArrayCopy);
 
             case GridBinaryMarshaller.CHAR_ARR:
                 return BinaryUtils.doReadCharArray(reader.in());
@@ -174,6 +191,31 @@ public abstract class SqlListenerUtils {
         }
     }
 
+    /**
+     * Read byte array using the reader.
+     *
+     * <p>Returns either (eagerly) new instance of the byte array with all 
data materialized,
+     * or {@link JdbcBinaryBuffer} which wraps part of the array enclosed in
+     * the reader's input stream in a copy-on-write manner.
+     *
+     * @param reader Reader.
+     * @param createByteArrayCopy Whether create new byte array copy or try to 
create copy-on-write buffer.
+     * @return Either byte[] or {@link JdbcBinaryBuffer}.
+     */
+    private static Object readByteArray(BinaryReaderExImpl reader, boolean 
createByteArrayCopy) {
+        if (!createByteArrayCopy && reader.in().hasArray()) {
+            int len = reader.in().readInt();
+
+            int position = reader.in().position();
+
+            reader.in().position(position + len);
+
+            return JdbcBinaryBuffer.createReadOnly(reader.in().array(), 
position, len);
+        }
+        else
+            return BinaryUtils.doReadByteArray(reader.in());
+    }
+
     /**
      * @param writer Writer.
      * @param obj Object to write.
@@ -246,12 +288,51 @@ public abstract class SqlListenerUtils {
             writer.writeTimestampArray((Timestamp[])obj);
         else if (cls == java.util.Date[].class || cls == java.sql.Date[].class)
             writer.writeDateArray((java.util.Date[])obj);
+        else if (obj instanceof SqlInputStreamWrapper)
+            writeByteArray(writer, (SqlInputStreamWrapper)obj);
+        else if (obj instanceof Blob)
+            writeByteArray(writer, (Blob)obj);
         else if (binObjAllow)
             writer.writeObjectDetached(obj);
         else
             throw new BinaryObjectException("Custom objects are not 
supported");
     }
 
+    /**
+     * Write byte array from the InputStream enclosed in the stream wrapper.
+     *
+     * @param writer Writer.
+     * @param wrapper stream wrapper
+     */
+    private static void writeByteArray(BinaryWriterExImpl writer, 
SqlInputStreamWrapper wrapper) throws BinaryObjectException {
+        int written = writer.writeByteArray(wrapper.inputStream(), 
wrapper.length());
+
+        if (wrapper.length() != -1 && wrapper.length() != written) {
+            throw new BinaryObjectException("Input stream length mismatch. 
[declaredLength=" + wrapper.length() + ", " +
+                    "actualLength=" + written + "]");
+        }
+    }
+
+    /**
+     * Write byte array from the Blob instance.
+     *
+     * @param writer Writer.
+     * @param blob Blob.
+     */
+    private static void writeByteArray(BinaryWriterExImpl writer, Blob blob) 
throws BinaryObjectException {
+        try {
+            int written = writer.writeByteArray(blob.getBinaryStream(), 
(int)blob.length());
+
+            if ((int)blob.length() != written) {
+                throw new BinaryObjectException("Blob length mismatch. 
[declaredLength=" + (int)blob.length() + ", " +
+                        "actualLength=" + written + "]");
+            }
+        }
+        catch (SQLException e) {
+            throw new BinaryObjectException(e);
+        }
+    }
+
     /**
      * @param cls Class.
      * @return {@code true} is the type is plain (not user's custom class).
@@ -284,7 +365,9 @@ public abstract class SqlListenerUtils {
             || cls == UUID[].class
             || cls == Time[].class
             || cls == Timestamp[].class
-            || cls == java.util.Date[].class || cls == java.sql.Date[].class;
+            || cls == java.util.Date[].class || cls == java.sql.Date[].class
+            || cls == SqlInputStreamWrapper.class
+            || cls == Blob.class;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
index 74af149e45c..eb954287034 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
@@ -65,7 +65,7 @@ public class JdbcUtils {
                 List<Object> col = new ArrayList<>(colsSize);
 
                 for (int colCnt = 0; colCnt < colsSize; ++colCnt)
-                    col.add(readObject(reader, protoCtx));
+                    col.add(readObject(reader, protoCtx, false));
 
                 items.add(col);
             }
@@ -133,6 +133,22 @@ public class JdbcUtils {
             writer.writeInt(val);
     }
 
+    /**
+     * @param reader Reader.
+     * @param protoCtx Protocol context.
+     * @param createByteArrayCopy Whether to create new copy or copy-on-write 
buffer for byte array.
+     * @return Read object.
+     * @throws BinaryObjectException On error.
+     */
+    @Nullable public static Object readObject(
+            BinaryReaderExImpl reader,
+            JdbcProtocolContext protoCtx,
+            boolean createByteArrayCopy
+    ) throws BinaryObjectException {
+        return SqlListenerUtils.readObject(reader.readByte(), reader,
+                protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT), 
protoCtx.keepBinary(), createByteArrayCopy);
+    }
+
     /**
      * @param reader Reader.
      * @param protoCtx Protocol context.
@@ -143,8 +159,7 @@ public class JdbcUtils {
         BinaryReaderExImpl reader,
         JdbcProtocolContext protoCtx
     ) throws BinaryObjectException {
-        return SqlListenerUtils.readObject(reader.readByte(), reader,
-            protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT), 
protoCtx.keepBinary());
+        return readObject(reader, protoCtx, true);
     }
 
     /**


Reply via email to