This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 05f48742673 IGNITE-23114 JDBC thin: add support for stream APIs for
BLOB (#11518)
05f48742673 is described below
commit 05f48742673cc0888a41c5798b0cde1fa6a3f61e
Author: Sergey Korotkov <[email protected]>
AuthorDate: Fri Nov 15 17:17:33 2024 +0700
IGNITE-23114 JDBC thin: add support for stream APIs for BLOB (#11518)
---
.../internal/jdbc2/JdbcBinaryBufferTest.java | 55 +++
.../apache/ignite/internal/jdbc2/JdbcBlobTest.java | 358 ++++++++++++++++-
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 2 +
.../ignite/jdbc/thin/JdbcThinBatchSelfTest.java | 27 +-
.../thin/JdbcThinPreparedStatementSelfTest.java | 243 ++++++++++--
.../jdbc/thin/JdbcThinResultSetSelfTest.java | 206 +++++++++-
.../ignite/internal/binary/BinaryWriterExImpl.java | 64 +++
.../binary/streams/BinaryAbstractOutputStream.java | 2 +-
.../internal/jdbc/thin/JdbcThinConnection.java | 2 +-
.../jdbc/thin/JdbcThinPreparedStatement.java | 36 +-
.../internal/jdbc/thin/JdbcThinResultSet.java | 49 ++-
.../ignite/internal/jdbc2/JdbcBinaryBuffer.java | 440 +++++++++++++++++++++
.../org/apache/ignite/internal/jdbc2/JdbcBlob.java | 195 ++++++---
.../processors/odbc/SqlInputStreamWrapper.java | 63 +++
.../internal/processors/odbc/SqlListenerUtils.java | 89 ++++-
.../internal/processors/odbc/jdbc/JdbcUtils.java | 21 +-
16 files changed, 1701 insertions(+), 151 deletions(-)
diff --git
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBufferTest.java
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBufferTest.java
new file mode 100644
index 00000000000..ea6f469ea47
--- /dev/null
+++
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBufferTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.IOException;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+import static org.apache.ignite.internal.jdbc2.JdbcBinaryBuffer.MIN_CAP;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.junit.Assert.assertEquals;
+
+/** */
+public class JdbcBinaryBufferTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCapacity() throws Exception {
+ assertEquals(MIN_CAP, JdbcBinaryBuffer.capacity(10, 20));
+
+ assertEquals(MAX_ARRAY_SIZE, JdbcBinaryBuffer.capacity(10,
MAX_ARRAY_SIZE));
+
+ assertEquals(MIN_CAP * 16, JdbcBinaryBuffer.capacity(MIN_CAP, MIN_CAP
* 10));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWriteTooMuchData() throws Exception {
+ JdbcBinaryBuffer buf = JdbcBinaryBuffer.createReadWrite(new byte[10]);
+
+ assertThrows(null, () -> {
+ buf.write(MAX_ARRAY_SIZE, 1);
+
+ return null;
+ }, IOException.class, "Too much data. Can't write more then 2147483639
bytes.");
+ }
+}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
index 0b0119e48e9..3bd494e02af 100644
---
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBlobTest.java
@@ -19,16 +19,24 @@ package org.apache.ignite.internal.jdbc2;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Blob;
import java.sql.SQLException;
import java.util.Arrays;
import org.junit.Test;
+import static
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** */
public class JdbcBlobTest {
+ /** */
+ static final String ERROR_BLOB_FREE = "Blob instance can't be used after
free() has been called.";
+
/**
* @throws Exception If failed.
*/
@@ -155,6 +163,21 @@ public class JdbcBlobTest {
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetBinaryStreamReadMoreThenBlobSize() throws Exception {
+ byte[] arr = new byte[] {1, 2, 3, 4, 5};
+
+ JdbcBlob blob = new JdbcBlob(arr);
+
+ InputStream is = blob.getBinaryStream();
+ byte[] res = new byte[7];
+ assertEquals(5, is.read(res, 0, 7));
+ assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 0, 0}, res);
+ }
+
/**
* @throws Exception If failed.
*/
@@ -228,6 +251,145 @@ public class JdbcBlobTest {
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetBinaryStreamWithParamsReadMoreThenStreamLimit() throws
Exception {
+ byte[] arr = new byte[] {1, 2, 3, 4, 5};
+
+ JdbcBlob blob = new JdbcBlob(arr);
+
+ InputStream is = blob.getBinaryStream(2, 3);
+ byte[] res = new byte[6];
+ assertEquals(3, is.read(res, 1, 5));
+ assertArrayEquals(new byte[] {0, 2, 3, 4, 0, 0}, res);
+ assertEquals(-1, is.read(res, 0, 1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetBinaryStreamReadFromTruncated() throws Exception {
+ byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+ JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(arr, 0,
arr.length));
+
+ InputStream is = blob.getBinaryStream(2, 4);
+
+ assertEquals(1, is.read());
+
+ blob.truncate(6);
+
+ assertEquals(2, is.read());
+
+ blob.truncate(2);
+
+ assertEquals(-1, is.read());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetBinaryStreamMarkSkipReset() throws Exception {
+ byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+ JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadWrite(arr));
+
+ InputStream is = blob.getBinaryStream(2, arr.length - 2);
+
+ assertEquals(1, is.read());
+ is.reset();
+ assertEquals(1, is.read());
+
+ assertEquals(2, is.read());
+ is.mark(1);
+ assertEquals(3, is.read());
+ assertEquals(4, is.read());
+ is.reset();
+ assertEquals(3, is.read());
+
+ assertEquals(0, is.skip(-1));
+ assertEquals(0, is.skip(0));
+ assertEquals(1, is.skip(1));
+ assertEquals(2, is.skip(2));
+ assertEquals(7, is.read());
+
+ is.reset();
+ assertEquals(3, is.read());
+
+ assertEquals(4, is.skip(100));
+ assertEquals(-1, is.read());
+
+ is.reset();
+ assertEquals(5, is.skip(Long.MAX_VALUE));
+ assertEquals(-1, is.read());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetBinaryStreamSeeChangesDoneAfterCreate() throws
Exception {
+ byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+ JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadWrite(arr));
+
+ InputStream is = blob.getBinaryStream();
+
+ assertEquals(0, is.read());
+ assertEquals(1, is.read());
+
+ OutputStream os = blob.setBinaryStream(3);
+ os.write(11);
+
+ assertEquals(11, is.read());
+
+ blob.setBytes(4, new byte[] {12});
+
+ assertEquals(12, is.read());
+
+ byte[] res = is.readAllBytes();
+ assertArrayEquals(new byte[] {4, 5, 6, 7, 8}, res);
+
+ blob.setBytes(blob.length() + 1, new byte[] {13, 14, 15});
+ assertEquals(13, is.read());
+ assertEquals(14, is.read());
+ assertEquals(15, is.read());
+ assertEquals(-1, is.read());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetBinaryStreamWithParamsSeeChangesDoneAfterCreate()
throws Exception {
+ byte[] arr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+ JdbcBlob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(arr, 0,
arr.length));
+
+ InputStream is = blob.getBinaryStream(2, arr.length - 2);
+
+ assertEquals(1, is.read());
+
+ OutputStream os = blob.setBinaryStream(3);
+ os.write(11);
+
+ assertEquals(11, is.read());
+
+ blob.setBytes(4, new byte[] {12});
+
+ assertEquals(12, is.read());
+
+ byte[] res = is.readNBytes(4);
+ assertArrayEquals(new byte[] {4, 5, 6, 7}, res);
+
+ blob.setBytes(blob.length() + 1, new byte[] {13});
+ assertEquals(-1, is.read());
+ }
+
/**
* @throws Exception If failed.
*/
@@ -237,7 +399,7 @@ public class JdbcBlobTest {
JdbcBlob blob = new JdbcBlob(arr);
- assertEquals(-1, blob.position(new byte[] {1, 2, 3}, 0));
+ assertThrows(null, () -> blob.position(new byte[] {1, 2, 3}, 0),
SQLException.class, null);
assertEquals(-1, blob.position(new byte[] {1, 2, 3}, arr.length + 1));
assertEquals(-1, blob.position(new byte[0], 1));
assertEquals(-1, blob.position(new byte[17], 1));
@@ -251,6 +413,9 @@ public class JdbcBlobTest {
assertEquals(-1, blob.position(new byte[] {0, 2, 3}, 1));
assertEquals(-1, blob.position(new byte[] {1, 2, 4}, 1));
+ blob.setBytes(17, new byte[] {16, 16, 16, 33, 46});
+ assertEquals(18, blob.position(new byte[] {16, 16, 33}, 1));
+
blob.free();
try {
@@ -272,7 +437,7 @@ public class JdbcBlobTest {
JdbcBlob blob = new JdbcBlob(arr);
- assertEquals(-1, blob.position(new JdbcBlob(new byte[] {1, 2, 3}), 0));
+ assertThrows(null, () -> blob.position(new JdbcBlob(new byte[] {1, 2,
3}), 0), SQLException.class, null);
assertEquals(-1, blob.position(new JdbcBlob(new byte[] {1, 2, 3}),
arr.length + 1));
assertEquals(-1, blob.position(new JdbcBlob(new byte[0]), 1));
assertEquals(-1, blob.position(new JdbcBlob(new byte[17]), 1));
@@ -286,6 +451,9 @@ public class JdbcBlobTest {
assertEquals(-1, blob.position(new JdbcBlob(new byte[] {0, 2, 3}), 1));
assertEquals(-1, blob.position(new JdbcBlob(new byte[] {1, 2, 4}), 1));
+ blob.setBytes(17, new byte[] {16, 16, 16, 33, 46});
+ assertEquals(18, blob.position(new JdbcBlob(new byte[] {16, 16, 33}),
1));
+
blob.free();
try {
@@ -321,7 +489,7 @@ public class JdbcBlobTest {
fail();
}
- catch (ArrayIndexOutOfBoundsException e) {
+ catch (SQLException e) {
// No-op.
}
@@ -350,6 +518,52 @@ public class JdbcBlobTest {
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBytesRO() throws Exception {
+ byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+ Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+
+ blob.setBytes(2, new byte[] {11, 22});
+
+ assertArrayEquals(new byte[] {2, 11, 22, 5}, blob.getBytes(1,
(int)blob.length()));
+
+ assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBytesRealloc() throws Exception {
+ byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+ Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+
+ blob.setBytes(5, new byte[JdbcBinaryBuffer.MIN_CAP]);
+
+ assertEquals(JdbcBinaryBuffer.MIN_CAP + 4, blob.length());
+
+ assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBytesTooMuchData() throws Exception {
+ Blob blob = new JdbcBlob();
+
+ blob.setBytes(1, new byte[1]);
+
+ // Use fake one byte array.
+ assertThrows(null, () -> blob.setBytes(2, new byte[1], 0,
MAX_ARRAY_SIZE),
+ SQLException.class, "Too much data. Can't write more then
2147483639 bytes.");
+ }
+
/**
* @throws Exception If failed.
*/
@@ -373,7 +587,7 @@ public class JdbcBlobTest {
fail();
}
- catch (ArrayIndexOutOfBoundsException e) {
+ catch (SQLException e) {
// No-op.
}
@@ -382,7 +596,7 @@ public class JdbcBlobTest {
fail();
}
- catch (ArrayIndexOutOfBoundsException e) {
+ catch (IndexOutOfBoundsException e) {
// No-op.
}
@@ -391,7 +605,7 @@ public class JdbcBlobTest {
fail();
}
- catch (ArrayIndexOutOfBoundsException e) {
+ catch (IndexOutOfBoundsException e) {
// No-op.
}
@@ -471,10 +685,140 @@ public class JdbcBlobTest {
}
catch (SQLException e) {
// No-op.
- System.out.println();
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTruncateRO() throws Exception {
+ byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+ Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+ assertArrayEquals(new byte[] {2, 3, 4, 5}, blob.getBytes(1,
(int)blob.length()));
+
+ blob.truncate(2);
+ assertArrayEquals(new byte[] {2, 3}, blob.getBytes(1,
(int)blob.length()));
+
+ assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFree() throws Exception {
+ Blob blob = new JdbcBlob(new byte[] {0, 1, 2, 3, 4, 5, 6, 7});
+
+ assertEquals(8, blob.length());
+
+ blob.free();
+
+ blob.free();
+
+ assertThrows(null, blob::length, SQLException.class, ERROR_BLOB_FREE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBinaryStream() throws Exception {
+ Blob blob = new JdbcBlob();
+
+ assertThrows(null, () -> blob.setBinaryStream(0), SQLException.class,
null);
+ assertThrows(null, () -> blob.setBinaryStream(2), SQLException.class,
null);
+
+ OutputStream os = blob.setBinaryStream(1);
+
+ os.write(0);
+ os.write(new byte[] {1, 2, 3, 4, 5, 6, 7});
+ os.close();
+ assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7}, blob.getBytes(1,
(int)blob.length()));
+
+ os = blob.setBinaryStream(3);
+ os.write(new byte[] {20, 21, 22});
+ os.write(23);
+ os.close();
+ assertArrayEquals(new byte[]{0, 1, 20, 21, 22, 23, 6, 7},
blob.getBytes(1, (int)blob.length()));
+
+ os = blob.setBinaryStream(7);
+ os.write(new byte[] {30, 31, 32});
+ os.write(33);
+ os.close();
+ assertArrayEquals(new byte[]{0, 1, 20, 21, 22, 23, 30, 31, 32, 33},
blob.getBytes(1, (int)blob.length()));
+
+ blob.free();
+ assertThrows(null, () -> blob.setBinaryStream(2L), SQLException.class,
ERROR_BLOB_FREE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBinaryStreamTooMuchData() throws Exception {
+ Blob blob = new JdbcBlob();
+
+ OutputStream os = blob.setBinaryStream(1);
+ os.write(1);
+
+ assertThrows(null, () -> {
+ // Use fake one byte array.
+ os.write(new byte[1], 0, MAX_ARRAY_SIZE);
+
+ return null;
+ }, IOException.class, "Too much data. Can't write more then 2147483639
bytes.");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBinaryStreamRO() throws Exception {
+ byte[] roArr = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
+
+ Blob blob = new JdbcBlob(JdbcBinaryBuffer.createReadOnly(roArr, 2, 4));
+ assertArrayEquals(new byte[] {2, 3, 4, 5}, blob.getBytes(1,
(int)blob.length()));
+
+ OutputStream os = blob.setBinaryStream(5);
+ os.write(new byte[] {11, 22});
+ os.close();
+ assertArrayEquals(new byte[] {2, 3, 4, 5, 11, 22}, blob.getBytes(1,
(int)blob.length()));
+
+ assertArrayEquals(new byte[] {0, 1, 2, 3, 4, 5, 6, 7}, roArr);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSetBinaryStreamWriteToTruncated() throws Exception {
+ Blob blob = new JdbcBlob();
+
+ OutputStream os = blob.setBinaryStream(1);
+
+ os.write(1);
+
+ blob.truncate(0);
+
+ assertThrows(null, () -> {
+ os.write(2);
+
+ return null;
+ }, IOException.class, "Writting beyond end of Blob, it probably was
truncated after OutputStream " +
+ "was created [pos=1, blobLength=0]");
+
+ assertThrows(null, () -> {
+ os.write(new byte[] {2});
+
+ return null;
+ }, IOException.class, "Writting beyond end of Blob, it probably was
truncated after OutputStream " +
+ "was created [pos=1, blobLength=0]");
+
+ os.close();
+ }
+
/**
* @param is Input stream.
*/
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index f5b133e5fad..bc9e274523c 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.jdbc.suite;
import java.security.Security;
import org.apache.ignite.common.RunningQueryInfoCheckInitiatorTest;
+import org.apache.ignite.internal.jdbc2.JdbcBinaryBufferTest;
import org.apache.ignite.internal.jdbc2.JdbcBlobTest;
import org.apache.ignite.internal.jdbc2.JdbcBulkLoadSelfTest;
import org.apache.ignite.internal.jdbc2.JdbcClobTest;
@@ -127,6 +128,7 @@ import org.junit.runners.Suite;
JdbcSchemaCaseSelfTest.class,
JdbcClobTest.class,
+ JdbcBinaryBufferTest.class,
JdbcBlobTest.class,
org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class,
JdbcThinStreamingNotOrderedSelfTest.class,
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
index d9664b8a089..a4b5f6e8383 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.jdbc.thin;
+import java.io.ByteArrayInputStream;
import java.sql.BatchUpdateException;
+import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -37,8 +39,8 @@ import static org.junit.Assert.assertArrayEquals;
*/
public class JdbcThinBatchSelfTest extends
JdbcThinAbstractDmlStatementSelfTest {
/** SQL query. */
- private static final String SQL_PREPARED = "insert into Person(_key, id,
firstName, lastName, age) values " +
- "(?, ?, ?, ?, ?)";
+ private static final String SQL_PREPARED = "insert into Person(_key, id,
firstName, lastName, age, data) values " +
+ "(?, ?, ?, ?, ?, ?)";
/** Statement. */
private Statement stmt;
@@ -494,6 +496,14 @@ public class JdbcThinBatchSelfTest extends
JdbcThinAbstractDmlStatementSelfTest
pstmt.setString(paramCnt++, "Lastname" + i);
pstmt.setInt(paramCnt++, 20 + i);
+ if (i % 2 == 0) {
+ Blob blob = conn.createBlob();
+ blob.setBytes(1, getBytes("White"));
+ pstmt.setBlob(paramCnt, blob);
+ }
+ else
+ pstmt.setBinaryStream(paramCnt, new
ByteArrayInputStream(getBytes("Black")));
+
pstmt.addBatch();
}
@@ -525,6 +535,14 @@ public class JdbcThinBatchSelfTest extends
JdbcThinAbstractDmlStatementSelfTest
pstmt.setString(paramCnt++, "Lastname" + i);
pstmt.setInt(paramCnt++, 20 + i);
+ if (i % 2 == 0) {
+ Blob blob = conn.createBlob();
+ blob.setBytes(1, getBytes("White" + i));
+ pstmt.setBlob(paramCnt, blob);
+ }
+ else
+ pstmt.setBinaryStream(paramCnt, new
ByteArrayInputStream(getBytes("Black" + i)));
+
pstmt.addBatch();
}
@@ -534,6 +552,7 @@ public class JdbcThinBatchSelfTest extends
JdbcThinAbstractDmlStatementSelfTest
pstmt.setString(paramCnt++, "Name" + FAILED_IDX);
pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX);
pstmt.setInt(paramCnt++, 20 + FAILED_IDX);
+ pstmt.setBinaryStream(paramCnt, new
ByteArrayInputStream(getBytes("Black" + FAILED_IDX)));
pstmt.addBatch();
@@ -543,6 +562,9 @@ public class JdbcThinBatchSelfTest extends
JdbcThinAbstractDmlStatementSelfTest
pstmt.setString(paramCnt++, "Name" + FAILED_IDX + 1);
pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX + 1);
pstmt.setInt(paramCnt++, 20 + FAILED_IDX + 1);
+ Blob blob = conn.createBlob();
+ blob.setBytes(1, getBytes("White" + FAILED_IDX + 1));
+ pstmt.setBlob(paramCnt, blob);
pstmt.addBatch();
@@ -917,6 +939,7 @@ public class JdbcThinBatchSelfTest extends
JdbcThinAbstractDmlStatementSelfTest
stmt.setString(paramCnt++, "Name" + personIdx);
stmt.setString(paramCnt++, "Lastname" + personIdx);
stmt.setInt(paramCnt++, 20 + personIdx);
+ stmt.setBinaryStream(paramCnt, new
ByteArrayInputStream(getBytes("White")));
}
/**
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
index 7982980491d..6adbebfcd51 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPreparedStatementSelfTest.java
@@ -17,7 +17,10 @@
package org.apache.ignite.jdbc.thin;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Reader;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -30,6 +33,7 @@ import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
@@ -67,6 +71,8 @@ import static java.sql.Types.TINYINT;
import static java.sql.Types.VARCHAR;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
@@ -130,7 +136,7 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
o.bigVal = new BigDecimal(1);
o.strVal = "str";
o.arrVal = new byte[] {1};
- o.blobVal = new byte[] {1};
+ o.blobVal = new byte[] {1, 2, 3};
o.clobVal = "large str";
o.dateVal = new Date(1);
o.timeVal = new Time(1);
@@ -867,7 +873,7 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
Blob blob = conn.createBlob();
- blob.setBytes(1, new byte[] {1});
+ blob.setBytes(1, new byte[] {1, 2, 3});
stmt.setBlob(1, blob);
@@ -886,6 +892,209 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
assertFalse(rs.next());
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobWrittenViaOutputStream() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ Blob blob = conn.createBlob();
+
+ try (OutputStream out = blob.setBinaryStream(1)) {
+ out.write(new byte[] {1, 2, 3});
+ }
+
+ stmt.setBlob(1, blob);
+
+ checkStmtExec(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobSeeChangesDoneAfterAddToStatement() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ Blob blob = conn.createBlob();
+
+ stmt.setBlob(1, blob);
+
+ try (OutputStream out = blob.setBinaryStream(1)) {
+ out.write(new byte[] {1});
+ out.write(2);
+ }
+
+ blob.setBytes(3, new byte[] {3});
+
+ checkStmtExec(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobNull() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ stmt.setBlob(1, (Blob)null);
+ checkStmtExec(2);
+
+ stmt.setNull(1, BLOB);
+ checkStmtExec(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStreamKnownLength() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1,
2, 3});
+
+ assertThrows(null, () -> {
+ stmt.setBinaryStream(1, stream, -1L);
+ return null;
+ }, SQLException.class, null);
+
+ assertThrows(null, () -> {
+ stmt.setBinaryStream(1, stream, -1);
+ return null;
+ }, SQLException.class, null);
+
+ assertThrows(null, () -> {
+ stmt.setBinaryStream(1, stream, (long)MAX_ARRAY_SIZE + 1);
+ return null;
+ }, SQLFeatureNotSupportedException.class, null);
+
+ assertThrows(null, () -> {
+ stmt.setBinaryStream(1, stream, MAX_ARRAY_SIZE + 1);
+ return null;
+ }, SQLFeatureNotSupportedException.class, null);
+
+ stmt.setBinaryStream(1, stream, 3);
+ checkStmtExec(1);
+
+ stream.reset();
+ stmt.setBinaryStream(1, stream, 3L);
+ checkStmtExec(1);
+
+ stream.reset();
+ stmt.setBinaryStream(1, stream, 10L);
+ assertThrows(null, () -> stmt.executeQuery(), SQLException.class,
null);
+
+ stmt.setBinaryStream(1, null, 0);
+ checkStmtExec(2);
+
+ stmt.setBinaryStream(1, null, 0L);
+ checkStmtExec(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStreamUnknownLength() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1,
2, 3});
+
+ stmt.setBinaryStream(1, stream);
+ checkStmtExec(1);
+
+ stmt.setBinaryStream(1, null);
+ checkStmtExec(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStreamThrows() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ stmt.setBinaryStream(1, new ThrowingInputStream());
+
+ assertThrows(null, () -> stmt.executeQuery(), SQLException.class,
null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobStreamKnownLength() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1,
2, 3});
+
+ assertThrows(null, () -> {
+ stmt.setBlob(1, stream, -1L);
+ return null;
+ }, SQLException.class, null);
+
+ assertThrows(null, () -> {
+ stmt.setBlob(1, stream, (long)MAX_ARRAY_SIZE + 1);
+ return null;
+ }, SQLFeatureNotSupportedException.class, null);
+
+ stmt.setBlob(1, stream, 3L);
+ checkStmtExec(1);
+
+ stream.reset();
+ stmt.setBlob(1, stream, 10L);
+ assertThrows(null, () -> stmt.executeQuery(), SQLException.class,
null);
+
+ stmt.setBlob(1, null, 0L);
+ checkStmtExec(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobStreamUnknownLength() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(new byte[]{1,
2, 3});
+
+ stmt.setBlob(1, stream);
+ checkStmtExec(1);
+
+ stmt.setBlob(1, (Blob)null);
+ checkStmtExec(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobStreamThrows() throws Exception {
+ stmt = conn.prepareStatement(SQL_PART + " where blobVal is not
distinct from ?");
+
+ stmt.setBinaryStream(1, new ThrowingInputStream());
+
+ assertThrows(null, () -> stmt.executeQuery(), SQLException.class,
null);
+ }
+
+ /** */
+ private void checkStmtExec(int expectedId) throws SQLException {
+ ResultSet rs = stmt.executeQuery();
+
+ assertTrue(rs.next());
+ assertEquals(expectedId, rs.getInt("id"));
+ assertFalse(rs.next());
+ }
+
+ /** */
+ static class ThrowingInputStream extends InputStream {
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ throw new IOException();
+ }
+ }
+
/**
* @throws Exception If failed.
*/
@@ -1089,36 +1298,6 @@ public class JdbcThinPreparedStatementSelfTest extends
JdbcThinAbstractSelfTest
}
});
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- stmt.setBinaryStream(1, null);
- }
- });
-
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- stmt.setBinaryStream(1, null, 0);
- }
- });
-
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- stmt.setBinaryStream(1, null, 0L);
- }
- });
-
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- stmt.setBlob(1, (InputStream)null);
- }
- });
-
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- stmt.setBlob(1, null, 0L);
- }
- });
-
checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setCharacterStream(1, null);
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
index 60210c335db..016850d125e 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinResultSetSelfTest.java
@@ -144,7 +144,7 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
o.bigVal = new BigDecimal(1);
o.strVal = "1";
o.arrVal = new byte[] {1};
- o.blobVal = new byte[] {1};
+ o.blobVal = new byte[] {1, 2, 3};
o.clobVal = "str";
o.dateVal = new Date(1, 1, 1);
o.timeVal = new Time(1, 1, 1);
@@ -611,10 +611,63 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
assertTrue(rs.next());
Blob blob = rs.getBlob("blobVal");
- Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new
byte[] {1});
+ Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new
byte[] {1, 2, 3});
blob = rs.getBlob(16);
- Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new
byte[] {1});
+ Assert.assertArrayEquals(blob.getBytes(1, (int)blob.length()), new
byte[] {1, 2, 3});
+
+ assertFalse(rs.next());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobForNonBinaryType() throws Exception {
+ ResultSet rs = stmt.executeQuery(SQL);
+
+ assertTrue(rs.next());
+
+ assertThrows(null, () -> rs.getBlob("strVal"), SQLException.class,
"Cannot convert to Blob [colIdx=10]");
+
+ assertFalse(rs.next());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobChangeAfterSelect() throws Exception {
+ ResultSet rs = stmt.executeQuery(SQL);
+
+ assertTrue(rs.next());
+
+ Blob blob1 = rs.getBlob("blobVal");
+ Blob blob2 = rs.getBlob("blobVal");
+ Assert.assertArrayEquals(blob1.getBytes(1, (int)blob1.length()), new
byte[]{1, 2, 3});
+ Assert.assertArrayEquals(blob2.getBytes(1, (int)blob2.length()), new
byte[]{1, 2, 3});
+
+ blob1.setBytes(4, new byte[]{4});
+ blob2.truncate(2);
+
+ Assert.assertArrayEquals(blob1.getBytes(1, (int)blob1.length()), new
byte[]{1, 2, 3, 4});
+ Assert.assertArrayEquals(blob2.getBytes(1, (int)blob2.length()), new
byte[]{1, 2});
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlobNull() throws Exception {
+ ResultSet rs = stmt.executeQuery("select blobVal from testObject where
id = 2");
+
+ assertTrue(rs.next());
+
+ Blob blob = rs.getBlob("blobVal");
+ Assert.assertNull(blob);
+
+ blob = rs.getBlob(1);
+ Assert.assertNull(blob);
assertFalse(rs.next());
}
@@ -637,6 +690,109 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
assertFalse(rs.next());
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStream() throws Exception {
+ ResultSet rs = stmt.executeQuery(SQL);
+
+ assertTrue(rs.next());
+
+ InputStream is = rs.getBinaryStream("blobVal");
+
+ assertEquals(1, is.read());
+
+ assertTrue(is.markSupported());
+ is.mark(100);
+
+ byte[] res = new byte[]{33, 33, 33};
+ assertEquals(1, is.read(res, 1, 1));
+ Assert.assertArrayEquals(res, new byte[] {33, 2, 33});
+
+ is.reset();
+
+ assertEquals(2, is.read(res));
+ Assert.assertArrayEquals(res, new byte[] {2, 3, 33});
+
+ is.reset();
+
+ assertEquals(0, is.skip(-1));
+ assertEquals(0, is.skip(0));
+ assertEquals(1, is.skip(1));
+ assertEquals(3, is.read());
+
+ assertEquals(-1, is.read());
+ assertEquals(-1, is.read(res));
+
+ assertFalse(rs.next());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStreamInstancesAreIndependent() throws Exception {
+ ResultSet rs = stmt.executeQuery(SQL);
+
+ assertTrue(rs.next());
+
+ InputStream is1 = rs.getBinaryStream(16);
+ InputStream is2 = rs.getBinaryStream(16);
+
+ assertEquals(1, is1.read());
+ assertEquals(1, is1.skip(1));
+
+ assertEquals(1, is2.read());
+ is2.mark(100);
+ assertEquals(2, is2.read());
+ assertEquals(3, is2.read());
+
+ is2.reset();
+
+ assertEquals(3, is1.read());
+
+ assertEquals(2, is2.read());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStreamForNonBinaryType() throws Exception {
+ byte[] expected = "1".getBytes();
+
+ ResultSet rs = stmt.executeQuery(SQL);
+
+ assertTrue(rs.next());
+
+ InputStream asStream = rs.getBinaryStream("strVal");
+ byte[] asBytes = rs.getBytes("strVal");
+
+ Assert.assertArrayEquals(expected, asStream.readAllBytes());
+ Assert.assertArrayEquals(expected, asBytes);
+
+ assertFalse(rs.next());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryStreamNull() throws Exception {
+ ResultSet rs = stmt.executeQuery("select blobVal from testObject where
id = 2");
+
+ assertTrue(rs.next());
+
+ InputStream stream = rs.getBinaryStream("blobVal");
+ Assert.assertNull(stream);
+
+ stream = rs.getBinaryStream(1);
+ Assert.assertNull(stream);
+
+ assertFalse(rs.next());
+ }
+
/**
* @throws Exception If failed.
*/
@@ -753,6 +909,38 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
Assert.assertEquals("Result count mismatch", 1, cnt);
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testObjectForBinaryType() throws Exception {
+ ResultSet rs = stmt.executeQuery(SQL);
+
+ assertTrue(rs.next());
+
+ Object res = rs.getObject("blobVal");
+ assertTrue(res instanceof byte[]);
+
+ res = rs.getObject(16);
+ assertTrue(res instanceof byte[]);
+
+ res = rs.getObject("blobVal", byte[].class);
+ assertTrue(res instanceof byte[]);
+
+ res = rs.getObject(16, byte[].class);
+ assertTrue(res instanceof byte[]);
+
+ res = rs.getObject("blobVal", Blob.class);
+ assertTrue(res instanceof Blob);
+
+ res = rs.getObject(16, Blob.class);
+ assertTrue(res instanceof Blob);
+
+ assertThrows(null, () -> rs.getObject("strVal", Blob.class),
+ SQLException.class,
+ "Cannot convert to Blob [colIdx=10]");
+ }
+
/**
* @throws Exception If failed.
*/
@@ -854,18 +1042,6 @@ public class JdbcThinResultSetSelfTest extends
JdbcThinAbstractSelfTest {
}
});
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- rs.getBinaryStream(1);
- }
- });
-
- checkNotSupported(new RunnableX() {
- @Override public void runx() throws Exception {
- rs.getBinaryStream("id");
- }
- });
-
checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
rs.getCharacterStream(1);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index a28c0682474..826bc7df9e0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.binary;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectOutput;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
/**
* Binary writer implementation.
@@ -53,6 +55,9 @@ public class BinaryWriterExImpl implements BinaryWriter,
BinaryRawWriterEx, Obje
/** Initial capacity. */
private static final int INIT_CAP = 1024;
+ /** Default buffer size for reading from streams. */
+ public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
+
/** */
private final BinaryContext ctx;
@@ -1880,6 +1885,65 @@ public class BinaryWriterExImpl implements BinaryWriter,
BinaryRawWriterEx, Obje
fieldCnt++;
}
+ /**
+ * Write byte array from the InputStream.
+ *
+ * <p>If {@code limit} > 0 than no more than {@code limit} bytes will be
read and written.
+ * If {@code limit} == -1 than it will try to read and write all bytes.
+ *
+ * <p>In any case if actual number of bytes is greater than {@code
MAX_ARRAY_SIZE}
+ * than exception will be thrown.
+ *
+ * @param in InputStream.
+ * @param limit Max length of data to be read from the stream or -1 if all
data should be read.
+ * @return Number of bytes written.
+ * @throws BinaryObjectException If an I/O error occurs or stream contains
more than {@code MAX_ARRAY_SIZE} bytes.
+ */
+ public int writeByteArray(InputStream in, int limit) throws
BinaryObjectException {
+ if (limit != -1)
+ out.unsafeEnsure(1 + 4 + limit);
+ else
+ out.unsafeEnsure(1 + 4);
+
+ out.unsafeWriteByte(GridBinaryMarshaller.BYTE_ARR);
+
+ int lengthPos = out.position();
+
+ out.position(lengthPos + 4);
+
+ int written = 0;
+
+ byte[] buf = new byte[limit > 0 ? Math.min(limit, DEFAULT_BUFFER_SIZE)
: DEFAULT_BUFFER_SIZE];
+
+ while (limit == -1 || written < limit) {
+ int read;
+ try {
+ read = limit > 0
+ ? in.read(buf, 0, Math.min(buf.length, limit -
written))
+ : in.read(buf, 0, buf.length);
+ }
+ catch (IOException e) {
+ throw new BinaryObjectException(e);
+ }
+
+ if (read == -1)
+ break;
+
+ if (read + written > MAX_ARRAY_SIZE)
+ throw new BinaryObjectException("Too much data. Can't write
more then " + MAX_ARRAY_SIZE + " bytes from stream.");
+
+ out.writeByteArray(buf, 0, read);
+
+ written += read;
+ }
+
+ out.position(lengthPos);
+ out.unsafeWriteInt(written);
+ out.position(out.position() + written);
+
+ return written;
+ }
+
/**
* @param schemaId Schema ID.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
index 165bb8801e2..3beb93844cc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
@@ -36,7 +36,7 @@ public abstract class BinaryAbstractOutputStream extends
BinaryAbstractStream
* OutOfMemoryError: Requested array size exceeds VM limit
* @see java.util.ArrayList#MAX_ARRAY_SIZE
*/
- protected static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+ public static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/** {@inheritDoc} */
@Override public void writeByte(byte val) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 5f68115abba..0de9affbc1f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -876,7 +876,7 @@ public class JdbcThinConnection implements Connection {
@Override public Blob createBlob() throws SQLException {
ensureNotClosed();
- return new JdbcBlob(new byte[0]);
+ return new JdbcBlob();
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index d64aaab423e..0aa55e6c824 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -39,6 +39,7 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
+import org.apache.ignite.internal.processors.odbc.SqlInputStreamWrapper;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsRequest;
@@ -46,6 +47,9 @@ import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsResult;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
+import static java.sql.Types.BINARY;
+import static
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+
/**
* JDBC prepared statement implementation.
*/
@@ -213,9 +217,7 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
/** {@inheritDoc} */
@Override public void setBinaryStream(int paramIdx, InputStream x, int
length) throws SQLException {
- ensureNotClosed();
-
- throw new SQLFeatureNotSupportedException("Streams are not
supported.");
+ setBinaryStream(paramIdx, x, (long)length);
}
/** {@inheritDoc} */
@@ -305,7 +307,7 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
/** {@inheritDoc} */
@Override public void setBlob(int paramIdx, Blob x) throws SQLException {
- setBytes(paramIdx, x.getBytes(1, (int)x.length()));
+ setArgument(paramIdx, x);
}
/** {@inheritDoc} */
@@ -411,9 +413,7 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
/** {@inheritDoc} */
@Override public void setBlob(int paramIdx, InputStream inputStream, long
length) throws SQLException {
- ensureNotClosed();
-
- throw new SQLFeatureNotSupportedException("SQL-specific types are not
supported.");
+ setBinaryStream(paramIdx, inputStream, length);
}
/** {@inheritDoc} */
@@ -446,7 +446,18 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
@Override public void setBinaryStream(int paramIdx, InputStream x, long
length) throws SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Streams are not
supported.");
+ if (length < 0)
+ throw new SQLException("Invalid argument. Length should be greater
than 0.");
+
+ if (length > MAX_ARRAY_SIZE) {
+ throw new SQLFeatureNotSupportedException("Invalid argument.
InputStreams with length greater than " + MAX_ARRAY_SIZE +
+ " are not supported.");
+ }
+
+ if (x == null)
+ setNull(paramIdx, BINARY);
+ else
+ setArgument(paramIdx, new SqlInputStreamWrapper(x, (int)length));
}
/** {@inheritDoc} */
@@ -467,7 +478,10 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
@Override public void setBinaryStream(int paramIdx, InputStream x) throws
SQLException {
ensureNotClosed();
- throw new SQLFeatureNotSupportedException("Streams are not
supported.");
+ if (x == null)
+ setNull(paramIdx, BINARY);
+ else
+ setArgument(paramIdx, new SqlInputStreamWrapper(x));
}
/** {@inheritDoc} */
@@ -493,9 +507,7 @@ public class JdbcThinPreparedStatement extends
JdbcThinStatement implements Prep
/** {@inheritDoc} */
@Override public void setBlob(int paramIdx, InputStream inputStream)
throws SQLException {
- ensureNotClosed();
-
- throw new SQLFeatureNotSupportedException("SQL-specific types are not
supported.");
+ setBinaryStream(paramIdx, inputStream);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
index 1ba7affc761..58cb64c04e4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.jdbc.thin;
+import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
@@ -46,6 +47,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.ignite.internal.jdbc2.JdbcBinaryBuffer;
import org.apache.ignite.internal.jdbc2.JdbcBlob;
import org.apache.ignite.internal.jdbc2.JdbcClob;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
@@ -472,6 +474,8 @@ public class JdbcThinResultSet implements ResultSet {
if (cls == byte[].class)
return (byte[])val;
+ else if (cls == JdbcBinaryBuffer.class)
+ return ((JdbcBinaryBuffer)val).bytes();
else if (cls == Byte.class)
return new byte[] {(byte)val};
else if (cls == Short.class) {
@@ -587,9 +591,15 @@ public class JdbcThinResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public InputStream getBinaryStream(int colIdx) throws
SQLException {
- ensureNotClosed();
+ Object val = getValue(colIdx);
+
+ if (val == null)
+ return null;
- throw new SQLFeatureNotSupportedException("Stream are not supported.");
+ if (val instanceof JdbcBinaryBuffer)
+ return ((JdbcBinaryBuffer)val).inputStream();
+ else
+ return new ByteArrayInputStream(getBytes(colIdx));
}
/** {@inheritDoc} */
@@ -699,9 +709,7 @@ public class JdbcThinResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public InputStream getBinaryStream(String colLb) throws
SQLException {
- ensureNotClosed();
-
- throw new SQLFeatureNotSupportedException("Streams are not
supported.");
+ return getBinaryStream(findColumn(colLb));
}
/** {@inheritDoc} */
@@ -738,14 +746,19 @@ public class JdbcThinResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public Object getObject(int colIdx) throws SQLException {
- return getValue(colIdx);
+ Object val = getValue(colIdx);
+
+ if (val instanceof JdbcBinaryBuffer)
+ return ((JdbcBinaryBuffer)val).bytes();
+ else
+ return val;
}
/** {@inheritDoc} */
@Override public Object getObject(String colLb) throws SQLException {
int colIdx = findColumn(colLb);
- return getValue(colIdx);
+ return getObject(colIdx);
}
/** {@inheritDoc} */
@@ -1298,12 +1311,22 @@ public class JdbcThinResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public Blob getBlob(int colIdx) throws SQLException {
- return new JdbcBlob(getBytes(colIdx));
+ Object val = getValue(colIdx);
+
+ if (val == null)
+ return null;
+
+ if (!(val instanceof JdbcBinaryBuffer))
+ throw new SQLException("Cannot convert to Blob [colIdx=" + colIdx
+ "]");
+
+ return new JdbcBlob(((JdbcBinaryBuffer)val).shallowCopy());
}
/** {@inheritDoc} */
@Override public Clob getClob(int colIdx) throws SQLException {
- return new JdbcClob(getString(colIdx));
+ String str = getString(colIdx);
+
+ return str != null ? new JdbcClob(str) : null;
}
/** {@inheritDoc} */
@@ -1327,12 +1350,14 @@ public class JdbcThinResultSet implements ResultSet {
/** {@inheritDoc} */
@Override public Blob getBlob(String colLb) throws SQLException {
- return new JdbcBlob(getBytes(colLb));
+ return getBlob(findColumn(colLb));
}
/** {@inheritDoc} */
@Override public Clob getClob(String colLb) throws SQLException {
- return new JdbcClob(getString(colLb));
+ String str = getString(colLb);
+
+ return str != null ? new JdbcClob(str) : null;
}
/** {@inheritDoc} */
@@ -1813,6 +1838,8 @@ public class JdbcThinResultSet implements ResultSet {
return getTimestamp(colIdx);
else if (targetCls == byte[].class)
return getBytes(colIdx);
+ else if (targetCls == Blob.class)
+ return getBlob(colIdx);
else if (targetCls == URL.class)
return getURL(colIdx);
else {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBuffer.java
new file mode 100644
index 00000000000..d9a57d47cb0
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBinaryBuffer.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Objects;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static
org.apache.ignite.internal.binary.streams.BinaryAbstractOutputStream.MAX_ARRAY_SIZE;
+
+/**
+ * Buffer storing the binary data.
+ *
+ * <p>Buffer can start working in read-only mode if created wrapping the
existing byte array which
+ * can not be modified. Any write operation switches it lazily to the
read-write mode. This allows
+ * to prevent the unnecessary data copying.
+ *
+ * <p>Data may be read via the InputStream API and modified via the
OutputStream one. Changes done via
+ * OutputStream are visible via the InputStream even if InputStream is created
before changes done.
+ *
+ * <p>InputStream and OutputStream created remain valid even if the underlying
data storage changed from
+ * read-only to read-write.
+ *
+ * <p>Note however that implementation is not thread-safe.
+ */
+public class JdbcBinaryBuffer {
+ /** Byte array storing data. */
+ private byte[] arr;
+
+ /** Offset the data starts in the array. */
+ private int off;
+
+ /** The length of data. */
+ private int len;
+
+ /** Read only flag. */
+ private boolean isReadOnly;
+
+ /** Minimum buffer capacity. */
+ public static final int MIN_CAP = 256;
+
+ /**
+ * Create buffer which wraps the existing byte array and start working in
the read-only mode.
+ *
+ * @param arr The byte array to be wrapped.
+ * @param off The offset to the first byte to be wrapped.
+ * @param len The length in bytes of the data to be wrapped.
+ */
+ public static JdbcBinaryBuffer createReadOnly(byte[] arr, int off, int
len) {
+ return new JdbcBinaryBuffer(arr, off, len, true);
+ }
+
+ /**
+ * Create buffer which takes ownerhip of and wraps data in the existing
byte array and
+ * starts working in the read-write mode.
+ *
+ * @param arr The byte array to be wrapped.
+ */
+ public static JdbcBinaryBuffer createReadWrite(byte[] arr) {
+ return new JdbcBinaryBuffer(arr, 0, arr.length, false);
+ }
+
+ /**
+ * Create empty buffer which starts working in the read-write mode.
+ */
+ public static JdbcBinaryBuffer createReadWrite() {
+ return new JdbcBinaryBuffer(new byte[MIN_CAP], 0, 0, false);
+ }
+
+ /**
+ * Private constructor.
+ *
+ * @param arr The byte array to be wrapped.
+ * @param off The offset to the first byte to be wrapped.
+ * @param len The length in bytes of the data to be wrapped.
+ * @param isReadOnly The read-only flag.
+ */
+ private JdbcBinaryBuffer(byte[] arr, int off, int len, boolean isReadOnly)
{
+ this.arr = arr;
+ this.off = off;
+ this.len = len;
+ this.isReadOnly = isReadOnly;
+ }
+
+ /**
+ * Create shallow read-only copy of this buffer.
+ */
+ public JdbcBinaryBuffer shallowCopy() {
+ return new JdbcBinaryBuffer(arr, off, len, true);
+ }
+
+ /**
+ * Provide InputStream through which the data can be read starting from the
+ * begining.
+ *
+ * <p>Stream is not limited meaning that it would return any new data
+ * written to the buffer after stream creation.
+ *
+ * @return InputStream instance.
+ */
+ public InputStream inputStream() {
+ return new BufferInputStream();
+ }
+
+ /**
+ * Get copy of the buffer data as byte array.
+ *
+ * @return Byte array containing buffer data.
+ */
+ public byte[] bytes() {
+ byte[] bytes = new byte[len];
+
+ read(0, bytes, 0, len);
+
+ return bytes;
+ }
+
+ /**
+ * Provide OutputStream through which the data can be written to buffer
starting from
+ * the (zero-based) {@code pos} position.
+ *
+ * @param pos The zero-based offset to the first byte to be written. Must
not be negative
+ * or greater than total count of bytes in buffer.
+ *
+ * @return OutputStream instance.
+ */
+ OutputStream outputStream(int pos) {
+ return new BufferOutputStream(pos);
+ }
+
+ /**
+ * Provide InputStream through which the no more than {@code len} bytes
can be read
+ * from buffer starting from the specified zero-based position {@code pos}.
+ *
+ * @param pos The zero-based offset to the first byte to be retrieved.
Must not be negative
+ * or greater than total count of bytes in buffer.
+ * @param len The length in bytes of the data to be retrieved. Must not be
negative.
+ * @return InputStream instance.
+ */
+ InputStream inputStream(int pos, int len) {
+ return new BufferInputStream(pos, pos + len);
+ }
+
+ /**
+ * Truncate data in this buffer to specified length.
+ *
+ * @param len New length.
+ */
+ void truncate(int len) {
+ this.len = len;
+
+ reallocate(Math.max(MIN_CAP, len));
+ }
+
+ /**
+ * @return Length of data in this buffer.
+ */
+ int length() {
+ return len;
+ }
+
+ /**
+ * Read up to {@code resLen} bytes from this buffer from specified
position {@code pos}.
+ *
+ * @param pos Pointer to a position.
+ * @param resBuf Output byte array to write to.
+ * @param resOff Offset in the output array to start write to.
+ * @param resLen Number of bytes to read.
+ * @return Number of bytes read. -1 if end of data reached.
+ */
+ int read(int pos, byte[] resBuf, int resOff, int resLen) {
+ Objects.checkFromIndexSize(resOff, resLen, resBuf.length);
+
+ if (pos >= len)
+ return -1;
+
+ int bufOff = pos + off;
+
+ int size = Math.min(resLen, len - pos);
+
+ U.arrayCopy(arr, bufOff, resBuf, resOff, size);
+
+ return size;
+ }
+
+ /**
+ * Writes {@code inpLen} bytes from the specified byte array {@code bytes}
starting at offset {@code off}
+ * to this storage to specified position {@code pos}.
+ *
+ * @param pos Pointer to a position.
+ * @param inpBuf Input byte array.
+ * @param inpOff Start offset in the input array.
+ * @param inpLen Number of bytes to write.
+ */
+ void write(int pos, byte[] inpBuf, int inpOff, int inpLen) throws
IOException {
+ if (MAX_ARRAY_SIZE - pos < inpLen)
+ throw new IOException("Too much data. Can't write more then " +
MAX_ARRAY_SIZE + " bytes.");
+
+ Objects.checkFromIndexSize(inpOff, inpLen, inpBuf.length);
+
+ updateLength(Math.max(pos + inpLen, len));
+
+ U.arrayCopy(inpBuf, inpOff, arr, pos, inpLen);
+ }
+
+ /**
+ * Read a byte from this buffer from specified position {@code pos}.
+ *
+ * @param pos Position.
+ * @return Byte read from the Blob. -1 if end of data reached.
+ */
+ int read(int pos) {
+ if (pos >= len)
+ return -1;
+
+ return arr[pos + off] & 0xff;
+ }
+
+ /**
+ * Write a byte to this buffer to specified position {@code pos}.
+ *
+ * <p>The byte to be written is the eight low-order bits of the
+ * argument {@code b}. The 24 high-order bits of {@code b}b are ignored.
+ *
+ * @param pos Pointer to a position.
+ * @param b Byte to write.
+ */
+ void write(int pos, int b) throws IOException {
+ if (MAX_ARRAY_SIZE - pos < 1)
+ throw new IOException("Too much data. Can't write more then " +
MAX_ARRAY_SIZE + " bytes.");
+
+ updateLength(Math.max(pos + 1, len));
+
+ arr[pos] = (byte)b;
+ }
+
+ /**
+ * Update data length. Reallocate underlining array if needed.
+ *
+ * @param newLen The new data length the buffer should be able to hold.
+ */
+ private void updateLength(int newLen) {
+ if (newLen > arr.length || isReadOnly)
+ reallocate(capacity(arr.length, newLen));
+
+ len = newLen;
+ }
+
+ /**
+ * Calculate new capacity.
+ *
+ * @param cap Current capacity.
+ * @param reqCap Required new capacity.
+ * @return New capacity.
+ */
+ protected static int capacity(int cap, int reqCap) {
+ if (reqCap <= MIN_CAP)
+ return MIN_CAP;
+
+ int resCap = Math.max(cap, MIN_CAP);
+
+ while (resCap < reqCap) {
+ resCap <<= 1;
+
+ if (resCap < 0)
+ return MAX_ARRAY_SIZE;
+ }
+
+ return resCap;
+ }
+
+ /**
+ * Allocate the new underlining array and copy data.
+ *
+ * @param newCapacity New capacity.
+ */
+ private void reallocate(int newCapacity) {
+ byte[] newBuf = new byte[newCapacity];
+
+ U.arrayCopy(arr, off, newBuf, 0, len);
+
+ arr = newBuf;
+ off = 0;
+ isReadOnly = false;
+ }
+
+ /**
+ * Input stream to read data from buffer.
+ */
+ private class BufferInputStream extends InputStream {
+ /** Max position in the buffer. -1 means no max position (for
unlimited stream). */
+ private final int limit;
+
+ /** Current position in the buffer. */
+ private int pos;
+
+ /** Remembered buffer position at the moment the {@link
InputStream#mark} is called. */
+ private int markedPos;
+
+ /**
+ * Create unlimited stream to read all data from the buffer starting
from the beginning.
+ */
+ private BufferInputStream() {
+ this(0, -1);
+ }
+
+ /**
+ * Create stream to read data from the buffer starting from the
specified {@code start}
+ * zero-based position.
+ *
+ * @param start The zero-based offset to the first byte to be
retrieved.
+ * @param limit The maximim length in bytes of the data to be
retrieved. Unlimited if null.
+ */
+ private BufferInputStream(int start, int limit) {
+ pos = start;
+
+ markedPos = start;
+
+ this.limit = limit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() {
+ if (limit != -1 && pos >= limit)
+ return -1;
+
+ int res = JdbcBinaryBuffer.this.read(pos);
+
+ if (res != -1)
+ pos++;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] res, int off, int cnt) {
+ Objects.checkFromIndexSize(off, cnt, res.length);
+
+ int toRead = cnt;
+
+ if (limit != -1) {
+ if (pos >= limit)
+ return -1;
+
+ toRead = Math.min(limit - pos, cnt);
+ }
+
+ int read = JdbcBinaryBuffer.this.read(pos, res, off, toRead);
+
+ if (read != -1)
+ pos += read;
+
+ return read;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean markSupported() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void reset() {
+ pos = markedPos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void mark(int readlimit) {
+ markedPos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long skip(long n) {
+ if (n <= 0)
+ return 0;
+
+ int step = Math.min((int)Math.min(n, MAX_ARRAY_SIZE),
+ limit == -1 ? len - pos : limit - pos);
+
+ pos += step;
+
+ return step;
+ }
+ }
+
+ /**
+ * Output stream to write data to buffer.
+ */
+ private class BufferOutputStream extends OutputStream {
+ /** Current position in the buffer. */
+ private int pos;
+
+ /**
+ * Create stream to write data to the buffer starting from the
specified position {@code pos}.
+ *
+ * @param pos Starting position (zero-based).
+ */
+ private BufferOutputStream(int pos) {
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ if (pos > length()) {
+ throw new IOException("Writting beyond end of Blob, it
probably was truncated after OutputStream was created " +
+ "[pos=" + pos + ", blobLength=" + length() + "]");
+ }
+
+ JdbcBinaryBuffer.this.write(pos++, b);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] b, int off, int len) throws
IOException {
+ if (pos > length()) {
+ throw new IOException("Writting beyond end of Blob, it
probably was truncated after OutputStream was created " +
+ "[pos=" + pos + ", blobLength=" + length() + "]");
+ }
+
+ JdbcBinaryBuffer.this.write(pos, b, off, len);
+
+ pos += len;
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
index ef2b6b5bd3a..682c0212a20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBlob.java
@@ -18,54 +18,79 @@
package org.apache.ignite.internal.jdbc2;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.Arrays;
-import org.apache.ignite.internal.util.typedef.internal.U;
/**
- * Simple BLOB implementation. Actually there is no such entity as BLOB in
Ignite. So using arrays is preferable way
+ * Simple BLOB implementation. Actually there is no such entity as BLOB in
Ignite. So using arrays is a preferable way
* to work with binary objects.
*
- * This implementation can be useful for reading binary fields of objects
through JDBC.
+ * <p>This implementation can be useful for writting and reading binary fields
of objects through JDBC.
*/
public class JdbcBlob implements Blob {
- /** Byte array. */
- private byte[] arr;
+ /** Buffer to store actial data. */
+ private JdbcBinaryBuffer buf;
/**
+ * Create empty Blob.
+ *
+ * <p>It's supposed to be called when client application creates Blob
calling the
+ * {@link java.sql.Connection#createBlob}.
+ */
+ public JdbcBlob() {
+ buf = JdbcBinaryBuffer.createReadWrite();
+ }
+
+ /**
+ * Create Blob which wraps the existing data stored in the buffer.
+ *
+ * <p>It's supposed to be called to create Blob for query result in the
{@link java.sql.ResultSet}.
+ *
+ * @param buf Existing buffer with data.
+ */
+ public JdbcBlob(JdbcBinaryBuffer buf) {
+ this.buf = buf;
+ }
+
+ /**
+ * Create Blob which wraps the existing byte array.
+ *
* @param arr Byte array.
*/
public JdbcBlob(byte[] arr) {
- this.arr = arr;
+ this(JdbcBinaryBuffer.createReadWrite(arr));
}
/** {@inheritDoc} */
@Override public long length() throws SQLException {
ensureNotClosed();
- return arr.length;
+ return buf.length();
}
/** {@inheritDoc} */
@Override public byte[] getBytes(long pos, int len) throws SQLException {
ensureNotClosed();
- if (pos < 1 || (arr.length - pos < 0 && arr.length > 0) || len < 0)
+ if (pos < 1 || (buf.length() - pos < 0 && buf.length() > 0) || len <
0) {
throw new SQLException("Invalid argument. Position can't be less
than 1 or " +
- "greater than size of underlying byte array. Requested length
also can't be negative " + "" +
- "[pos=" + pos + ", len=" + len + ']');
+ "greater than Blob length. Requested length also can't be
negative " +
+ "[pos=" + pos + ", len=" + len + ", blobLen=" + buf.length() +
"]");
+ }
int idx = (int)(pos - 1);
- int size = len > arr.length - idx ? arr.length - idx : len;
+ int size = Math.min(len, buf.length() - idx);
byte[] res = new byte[size];
- U.arrayCopy(arr, idx, res, 0, size);
+ if (size == 0)
+ return res;
+
+ buf.read(idx, res, 0, size);
return res;
}
@@ -74,55 +99,46 @@ public class JdbcBlob implements Blob {
@Override public InputStream getBinaryStream() throws SQLException {
ensureNotClosed();
- return new ByteArrayInputStream(arr);
+ return buf.inputStream();
}
/** {@inheritDoc} */
@Override public InputStream getBinaryStream(long pos, long len) throws
SQLException {
ensureNotClosed();
- if (pos < 1 || len < 1 || pos > arr.length || len > arr.length - pos +
1)
+ if (pos < 1 || len < 1 || pos > buf.length() || len > buf.length() -
pos + 1) {
throw new SQLException("Invalid argument. Position can't be less
than 1 or " +
- "greater than size of underlying byte array. Requested length
can't be negative and can't be " +
- "greater than available bytes from given position [pos=" + pos
+ ", len=" + len + ']');
+ "greater than Blob length. Requested length can't be negative
and can't be " +
+ "greater than available bytes from given position [pos=" + pos
+ ", len=" + len + ", blobLen=" + buf.length() + "]");
+ }
- return new ByteArrayInputStream(arr, (int)(pos - 1), (int)len);
+ return buf.inputStream((int)pos - 1, (int)len);
}
/** {@inheritDoc} */
@Override public long position(byte[] ptrn, long start) throws
SQLException {
ensureNotClosed();
- if (start < 1 || start > arr.length || ptrn.length == 0 || ptrn.length
> arr.length)
- return -1;
-
- for (int i = 0, pos = (int)(start - 1); pos < arr.length;) {
- if (arr[pos] == ptrn[i]) {
- pos++;
-
- i++;
-
- if (i == ptrn.length)
- return pos - ptrn.length + 1;
- }
- else {
- pos = pos - i + 1;
+ if (start < 1)
+ throw new SQLException("Invalid argument. Start position can't be
less than 1 [start=" + start + "]");
- i = 0;
- }
- }
+ if (start > buf.length() || ptrn.length == 0 || ptrn.length >
buf.length())
+ return -1;
- return -1;
+ return position(new ByteArrayInputStream(ptrn), ptrn.length,
(int)start);
}
/** {@inheritDoc} */
@Override public long position(Blob ptrn, long start) throws SQLException {
ensureNotClosed();
- if (start < 1 || start > arr.length || ptrn.length() == 0 ||
ptrn.length() > arr.length)
+ if (start < 1)
+ throw new SQLException("Invalid argument. Start position can't be
less than 1 [start=" + start + "]");
+
+ if (start > buf.length() || ptrn.length() == 0 || ptrn.length() >
buf.length())
return -1;
- return position(ptrn.getBytes(1, (int)ptrn.length()), start);
+ return position(ptrn.getBinaryStream(), (int)ptrn.length(),
(int)start);
}
/** {@inheritDoc} */
@@ -134,57 +150,108 @@ public class JdbcBlob implements Blob {
@Override public int setBytes(long pos, byte[] bytes, int off, int len)
throws SQLException {
ensureNotClosed();
- if (pos < 1)
- throw new SQLException("Invalid argument. Position can't be less
than 1 [pos=" + pos + ']');
-
- int idx = (int)(pos - 1);
-
- if (pos - 1 > arr.length || off < 0 || off >= bytes.length || off +
len > bytes.length)
- throw new ArrayIndexOutOfBoundsException();
-
- byte[] dst = arr;
-
- if (idx + len > arr.length) {
- dst = new byte[arr.length + (len - (arr.length - idx))];
-
- U.arrayCopy(arr, 0, dst, 0, idx);
-
- arr = dst;
+ if (pos < 1 || pos - 1 > buf.length()) {
+ throw new SQLException("Invalid argument. Position can't be less
than 1 or " +
+ "greater than Blob length + 1 [pos=" + pos + ", blobLen=" +
buf.length() + "]");
}
- U.arrayCopy(bytes, off, dst, idx, len);
+ try {
+ buf.write((int)pos - 1, bytes, off, len);
+ }
+ catch (IOException e) {
+ throw new SQLException(e);
+ }
return len;
}
/** {@inheritDoc} */
@Override public OutputStream setBinaryStream(long pos) throws
SQLException {
- throw new SQLFeatureNotSupportedException();
+ ensureNotClosed();
+
+ if (pos < 1 || pos - 1 > buf.length()) {
+ throw new SQLException("Invalid argument. Position can't be less
than 1 or greater than Blob length + 1 " +
+ "[pos=" + pos + ", blobLen=" + buf.length() + "]");
+ }
+
+ return buf.outputStream((int)pos - 1);
}
/** {@inheritDoc} */
@Override public void truncate(long len) throws SQLException {
ensureNotClosed();
- if (len < 0 || len > arr.length)
+ if (len < 0 || len > buf.length()) {
throw new SQLException("Invalid argument. Length can't be " +
- "less than zero or greater than Blob length [len=" + len +
']');
-
- arr = Arrays.copyOf(arr, (int)len);
+ "less than zero or greater than Blob length [len=" + len + ",
blobLen=" + buf.length() + "]");
+ }
+ buf.truncate((int)len);
}
/** {@inheritDoc} */
@Override public void free() throws SQLException {
- if (arr != null)
- arr = null;
+ if (buf != null)
+ buf = null;
+ }
+
+ /**
+ * Actial implementation of the pattern search.
+ *
+ * @param ptrn InputStream containing the pattern.
+ * @param ptrnLen Pattern length.
+ * @param idx 1-based index in Blob to start search from.
+ * @return 1-based position at which the pattern appears, else -1.
+ */
+ private long position(InputStream ptrn, int ptrnLen, int idx) throws
SQLException {
+ try (InputStream blob = buf.inputStream(idx - 1, buf.length() - idx +
1)) {
+ boolean patternStarted = false;
+
+ int ptrnPos = 0;
+ int blobPos = idx - 1;
+ int b;
+
+ while ((b = blob.read()) != -1) {
+ if (b == ptrn.read()) {
+ if (!patternStarted) {
+ patternStarted = true;
+
+ blob.mark(Integer.MAX_VALUE);
+ }
+
+ blobPos++;
+
+ ptrnPos++;
+
+ if (ptrnPos == ptrnLen)
+ return blobPos - ptrnLen + 1;
+ }
+ else {
+ blobPos = blobPos - ptrnPos + 1;
+
+ ptrnPos = 0;
+ ptrn.reset();
+
+ if (patternStarted) {
+ patternStarted = false;
+
+ blob.reset();
+ }
+ }
+ }
+
+ return -1;
+ }
+ catch (IOException e) {
+ throw new SQLException(e);
+ }
}
/**
*
*/
private void ensureNotClosed() throws SQLException {
- if (arr == null)
+ if (buf == null)
throw new SQLException("Blob instance can't be used after free()
has been called.");
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlInputStreamWrapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlInputStreamWrapper.java
new file mode 100644
index 00000000000..0cb58237633
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlInputStreamWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.InputStream;
+
+/**
+ * InputStream wrapper used to pass it as argument to PreparedStatement.
+ */
+public class SqlInputStreamWrapper {
+ /** Input stream wrapped. */
+ private final InputStream stream;
+
+ /** Length of data in the input stream. -1 if unknown. */
+ private final int len;
+
+ /**
+ * @param stream Input stream.
+ * @param len Length of data in the input stream. -1 if unknown.
+ */
+ public SqlInputStreamWrapper(InputStream stream, int len) {
+ this.stream = stream;
+ this.len = len;
+ }
+
+ /**
+ * @param stream Input stream.
+ */
+ public SqlInputStreamWrapper(InputStream stream) {
+ this(stream, -1);
+ }
+
+ /**
+ * Returns input stream for the enclosed data.
+ *
+ * @return Input stream.
+ */
+ public InputStream inputStream() {
+ return stream;
+ }
+
+ /**
+ * @return Length of data in the input stream.
+ */
+ public int length() {
+ return len;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
index de53c276ac5..be868c6ad20 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerUtils.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.odbc;
import java.math.BigDecimal;
+import java.sql.Blob;
+import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.UUID;
@@ -28,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.jdbc2.JdbcBinaryBuffer;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.util.typedef.F;
@@ -70,7 +73,21 @@ public abstract class SqlListenerUtils {
* @throws BinaryObjectException On error.
*/
@Nullable public static Object readObject(byte type, BinaryReaderExImpl
reader, boolean binObjAllow,
- boolean keepBinary) throws BinaryObjectException {
+ boolean keepBinary) throws
BinaryObjectException {
+ return readObject(type, reader, binObjAllow, keepBinary, true);
+ }
+
+ /**
+ * @param type Object type.
+ * @param reader Reader.
+ * @param binObjAllow Allow to read non plaint objects.
+ * @param keepBinary Whether to deserialize objects or keep in binary
format.
+ * @param createByteArrayCopy Whether to return new copy or copy-on-write
buffer for byte array.
+ * @return Read object.
+ * @throws BinaryObjectException On error.
+ */
+ @Nullable public static Object readObject(byte type, BinaryReaderExImpl
reader, boolean binObjAllow,
+ boolean keepBinary, boolean createByteArrayCopy) throws
BinaryObjectException {
switch (type) {
case GridBinaryMarshaller.NULL:
return null;
@@ -121,7 +138,7 @@ public abstract class SqlListenerUtils {
return BinaryUtils.doReadBooleanArray(reader.in());
case GridBinaryMarshaller.BYTE_ARR:
- return BinaryUtils.doReadByteArray(reader.in());
+ return readByteArray(reader, createByteArrayCopy);
case GridBinaryMarshaller.CHAR_ARR:
return BinaryUtils.doReadCharArray(reader.in());
@@ -174,6 +191,31 @@ public abstract class SqlListenerUtils {
}
}
+ /**
+ * Read byte array using the reader.
+ *
+ * <p>Returns either (eagerly) new instance of the byte array with all
data materialized,
+ * or {@link JdbcBinaryBuffer} which wraps part of the array enclosed in
+ * the reader's input stream in a copy-on-write manner.
+ *
+ * @param reader Reader.
+ * @param createByteArrayCopy Whether create new byte array copy or try to
create copy-on-write buffer.
+ * @return Either byte[] or {@link JdbcBinaryBuffer}.
+ */
+ private static Object readByteArray(BinaryReaderExImpl reader, boolean
createByteArrayCopy) {
+ if (!createByteArrayCopy && reader.in().hasArray()) {
+ int len = reader.in().readInt();
+
+ int position = reader.in().position();
+
+ reader.in().position(position + len);
+
+ return JdbcBinaryBuffer.createReadOnly(reader.in().array(),
position, len);
+ }
+ else
+ return BinaryUtils.doReadByteArray(reader.in());
+ }
+
/**
* @param writer Writer.
* @param obj Object to write.
@@ -246,12 +288,51 @@ public abstract class SqlListenerUtils {
writer.writeTimestampArray((Timestamp[])obj);
else if (cls == java.util.Date[].class || cls == java.sql.Date[].class)
writer.writeDateArray((java.util.Date[])obj);
+ else if (obj instanceof SqlInputStreamWrapper)
+ writeByteArray(writer, (SqlInputStreamWrapper)obj);
+ else if (obj instanceof Blob)
+ writeByteArray(writer, (Blob)obj);
else if (binObjAllow)
writer.writeObjectDetached(obj);
else
throw new BinaryObjectException("Custom objects are not
supported");
}
+ /**
+ * Write byte array from the InputStream enclosed in the stream wrapper.
+ *
+ * @param writer Writer.
+ * @param wrapper stream wrapper
+ */
+ private static void writeByteArray(BinaryWriterExImpl writer,
SqlInputStreamWrapper wrapper) throws BinaryObjectException {
+ int written = writer.writeByteArray(wrapper.inputStream(),
wrapper.length());
+
+ if (wrapper.length() != -1 && wrapper.length() != written) {
+ throw new BinaryObjectException("Input stream length mismatch.
[declaredLength=" + wrapper.length() + ", " +
+ "actualLength=" + written + "]");
+ }
+ }
+
+ /**
+ * Write byte array from the Blob instance.
+ *
+ * @param writer Writer.
+ * @param blob Blob.
+ */
+ private static void writeByteArray(BinaryWriterExImpl writer, Blob blob)
throws BinaryObjectException {
+ try {
+ int written = writer.writeByteArray(blob.getBinaryStream(),
(int)blob.length());
+
+ if ((int)blob.length() != written) {
+ throw new BinaryObjectException("Blob length mismatch.
[declaredLength=" + (int)blob.length() + ", " +
+ "actualLength=" + written + "]");
+ }
+ }
+ catch (SQLException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
/**
* @param cls Class.
* @return {@code true} is the type is plain (not user's custom class).
@@ -284,7 +365,9 @@ public abstract class SqlListenerUtils {
|| cls == UUID[].class
|| cls == Time[].class
|| cls == Timestamp[].class
- || cls == java.util.Date[].class || cls == java.sql.Date[].class;
+ || cls == java.util.Date[].class || cls == java.sql.Date[].class
+ || cls == SqlInputStreamWrapper.class
+ || cls == Blob.class;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
index 74af149e45c..eb954287034 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
@@ -65,7 +65,7 @@ public class JdbcUtils {
List<Object> col = new ArrayList<>(colsSize);
for (int colCnt = 0; colCnt < colsSize; ++colCnt)
- col.add(readObject(reader, protoCtx));
+ col.add(readObject(reader, protoCtx, false));
items.add(col);
}
@@ -133,6 +133,22 @@ public class JdbcUtils {
writer.writeInt(val);
}
+ /**
+ * @param reader Reader.
+ * @param protoCtx Protocol context.
+ * @param createByteArrayCopy Whether to create new copy or copy-on-write
buffer for byte array.
+ * @return Read object.
+ * @throws BinaryObjectException On error.
+ */
+ @Nullable public static Object readObject(
+ BinaryReaderExImpl reader,
+ JdbcProtocolContext protoCtx,
+ boolean createByteArrayCopy
+ ) throws BinaryObjectException {
+ return SqlListenerUtils.readObject(reader.readByte(), reader,
+ protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT),
protoCtx.keepBinary(), createByteArrayCopy);
+ }
+
/**
* @param reader Reader.
* @param protoCtx Protocol context.
@@ -143,8 +159,7 @@ public class JdbcUtils {
BinaryReaderExImpl reader,
JdbcProtocolContext protoCtx
) throws BinaryObjectException {
- return SqlListenerUtils.readObject(reader.readByte(), reader,
- protoCtx.isFeatureSupported(JdbcThinFeature.CUSTOM_OBJECT),
protoCtx.keepBinary());
+ return readObject(reader, protoCtx, true);
}
/**