Repository: nifi
Updated Branches:
  refs/heads/master 52fa50cde -> 68291636c


NIFI-1841: Fixed support for CLOB/BLOB types

This closes #1035.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/68291636
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/68291636
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/68291636

Branch: refs/heads/master
Commit: 68291636cb3e823013c430ccbffbc449d178b89d
Parents: 52fa50c
Author: Matt Burgess <mattyb...@apache.org>
Authored: Tue Sep 20 11:35:53 2016 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Tue Sep 20 15:33:06 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/util/JdbcCommon.java    |  50 +++++++-
 .../standard/util/TestJdbcCommon.java           | 114 ++++++++++++++++++-
 2 files changed, 156 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/68291636/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 11ba141..b591d01 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -45,10 +45,13 @@ import static java.sql.Types.VARBINARY;
 import static java.sql.Types.VARCHAR;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Clob;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -102,12 +105,55 @@ public class JdbcCommon {
                 }
                 for (int i = 1; i <= nrOfColumns; i++) {
                     final int javaSqlType = meta.getColumnType(i);
+
+                    // Need to handle CLOB and BLOB before getObject() is 
called, due to ResultSet's maximum portability statement
+                    if (javaSqlType == CLOB) {
+                        Clob clob = rs.getClob(i);
+                        if (clob != null) {
+                            long numChars = clob.length();
+                            char[] buffer = new char[(int) numChars];
+                            InputStream is = clob.getAsciiStream();
+                            int index = 0;
+                            int c = is.read();
+                            while (c > 0) {
+                                buffer[index++] = (char) c;
+                                c = is.read();
+                            }
+                            rec.put(i - 1, new String(buffer));
+                            clob.free();
+                        } else {
+                            rec.put(i - 1, null);
+                        }
+                        continue;
+                    }
+
+                    if (javaSqlType == BLOB) {
+                        Blob blob = rs.getBlob(i);
+                        if (blob != null) {
+                            long numChars = blob.length();
+                            byte[] buffer = new byte[(int) numChars];
+                            InputStream is = blob.getBinaryStream();
+                            int index = 0;
+                            int c = is.read();
+                            while (c > 0) {
+                                buffer[index++] = (byte) c;
+                                c = is.read();
+                            }
+                            ByteBuffer bb = ByteBuffer.wrap(buffer);
+                            rec.put(i - 1, bb);
+                            blob.free();
+                        } else {
+                            rec.put(i - 1, null);
+                        }
+                        continue;
+                    }
+
                     final Object value = rs.getObject(i);
 
                     if (value == null) {
                         rec.put(i - 1, null);
 
-                    } else if (javaSqlType == BINARY || javaSqlType == 
VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || 
javaSqlType == BLOB || javaSqlType == CLOB) {
+                    } else if (javaSqlType == BINARY || javaSqlType == 
VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY) {
                         // bytes requires little bit different handling
                         byte[] bytes = rs.getBytes(i);
                         ByteBuffer bb = ByteBuffer.wrap(bytes);
@@ -211,6 +257,7 @@ public class JdbcCommon {
                 case NCHAR:
                 case NVARCHAR:
                 case VARCHAR:
+                case CLOB:
                     
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
@@ -277,7 +324,6 @@ public class JdbcCommon {
                 case LONGVARBINARY:
                 case ARRAY:
                 case BLOB:
-                case CLOB:
                     
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
                     break;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68291636/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index b66c178..dd375aa 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -19,17 +19,22 @@ package org.apache.nifi.processors.standard.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.CharArrayReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -38,12 +43,14 @@ import java.sql.Types;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.apache.commons.io.input.ReaderInputStream;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -160,10 +167,8 @@ public class TestJdbcCommon {
         System.out.println("Avro serialized result size in bytes: " + 
serializedBytes.length);
 
         st.close();
-        con.close();
 
         // Deserialize bytes to records
-
         final InputStream instream = new ByteArrayInputStream(serializedBytes);
 
         final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
@@ -243,8 +248,8 @@ public class TestJdbcCommon {
             }
         }
 
-        Assert.assertTrue(foundIntSchema);
-        Assert.assertTrue(foundNullSchema);
+        assertTrue(foundIntSchema);
+        assertTrue(foundNullSchema);
     }
 
     @Test
@@ -277,8 +282,8 @@ public class TestJdbcCommon {
             }
         }
 
-        Assert.assertTrue(foundLongSchema);
-        Assert.assertTrue(foundNullSchema);
+        assertTrue(foundLongSchema);
+        assertTrue(foundNullSchema);
     }
 
 
@@ -323,6 +328,103 @@ public class TestJdbcCommon {
         }
     }
 
+    @Test
+    public void testClob() throws Exception {
+        try (final Statement stmt = con.createStatement()) {
+            stmt.executeUpdate("CREATE TABLE clobtest (id INT, text CLOB(64 
K))");
+            stmt.execute("INSERT INTO blobtest VALUES (41, NULL)");
+            PreparedStatement ps = con.prepareStatement("INSERT INTO clobtest 
VALUES (?, ?)");
+            ps.setInt(1, 42);
+            final char[] buffer = new char[4002];
+            IntStream.range(0, 4002).forEach((i) -> buffer[i] = 
String.valueOf(i % 10).charAt(0));
+            ReaderInputStream isr = new ReaderInputStream(new 
CharArrayReader(buffer), Charset.defaultCharset());
+
+            // - set the value of the input parameter to the input stream
+            ps.setAsciiStream(2, isr, 4002);
+            ps.execute();
+            isr.close();
+
+            final ResultSet resultSet = stmt.executeQuery("select * from 
clobtest");
+
+            final ByteArrayOutputStream outStream = new 
ByteArrayOutputStream();
+            JdbcCommon.convertToAvroStream(resultSet, outStream, false);
+
+            final byte[] serializedBytes = outStream.toByteArray();
+            assertNotNull(serializedBytes);
+
+            // Deserialize bytes to records
+            final InputStream instream = new 
ByteArrayInputStream(serializedBytes);
+
+            final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+            try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(instream, datumReader)) {
+                GenericRecord record = null;
+                while (dataFileReader.hasNext()) {
+                    // Reuse record object by passing it to next(). This saves 
us from
+                    // allocating and garbage collecting many objects for 
files with
+                    // many items.
+                    record = dataFileReader.next(record);
+                    Integer id = (Integer) record.get("ID");
+                    Object o = record.get("TEXT");
+                    if (id == 41) {
+                        assertNull(o);
+                    } else {
+                        assertNotNull(o);
+                        assertEquals(4002, o.toString().length());
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBlob() throws Exception {
+        try (final Statement stmt = con.createStatement()) {
+            stmt.executeUpdate("CREATE TABLE blobtest (id INT, b BLOB(64 K))");
+            stmt.execute("INSERT INTO blobtest VALUES (41, NULL)");
+            PreparedStatement ps = con.prepareStatement("INSERT INTO blobtest 
VALUES (?, ?)");
+            ps.setInt(1, 42);
+            final byte[] buffer = new byte[4002];
+            IntStream.range(0, 4002).forEach((i) -> buffer[i] = (byte) ((i % 
10) + 65));
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+            // - set the value of the input parameter to the input stream
+            ps.setBlob(2, bais, 4002);
+            ps.execute();
+            bais.close();
+
+            final ResultSet resultSet = stmt.executeQuery("select * from 
blobtest");
+
+            final ByteArrayOutputStream outStream = new 
ByteArrayOutputStream();
+            JdbcCommon.convertToAvroStream(resultSet, outStream, false);
+
+            final byte[] serializedBytes = outStream.toByteArray();
+            assertNotNull(serializedBytes);
+
+            // Deserialize bytes to records
+            final InputStream instream = new 
ByteArrayInputStream(serializedBytes);
+
+            final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+            try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(instream, datumReader)) {
+                GenericRecord record = null;
+                while (dataFileReader.hasNext()) {
+                    // Reuse record object by passing it to next(). This saves 
us from
+                    // allocating and garbage collecting many objects for 
files with
+                    // many items.
+                    record = dataFileReader.next(record);
+                    Integer id = (Integer) record.get("ID");
+                    Object o = record.get("B");
+                    if (id == 41) {
+                        assertNull(o);
+                    } else {
+                        assertNotNull(o);
+                        assertTrue(o instanceof ByteBuffer);
+                        assertEquals(4002, ((ByteBuffer) o).array().length);
+                    }
+                }
+            }
+        }
+    }
+
 
     // many test use Derby as database, so ensure driver is available
     @Test

Reply via email to