This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 760949922c7098d09561af5c2eef7ede055c62bf
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Jun 14 15:48:41 2023 -0400

    NIFI-11691 Support VARBINARY and LONGVARBINARY in PutDatabaseRecord
    
    This closes #7380
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/standard/PutDatabaseRecord.java     | 35 +++++++++++++++--
 .../processors/standard/PutDatabaseRecordTest.java | 45 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index f66b75e377..671e564d8c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -750,13 +750,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                     targetDataType = 
DataTypeUtils.getDataTypeFromSQLTypeValue(fieldSqlType);
                                 }
                                 if (targetDataType != null) {
-                                    if (sqlType == Types.BLOB || sqlType == 
Types.BINARY) {
+                                    if (sqlType == Types.BLOB || sqlType == 
Types.BINARY || sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
                                         if (currentValue instanceof Object[]) {
                                             // Convert Object[Byte] arrays to 
byte[]
                                             Object[] src = (Object[]) 
currentValue;
                                             if (src.length > 0) {
                                                 if (!(src[0] instanceof Byte)) 
{
-                                                    throw new 
IllegalTypeConversionException("Cannot convert value " + currentValue + " to 
BLOB/BINARY");
+                                                    throw new 
IllegalTypeConversionException("Cannot convert value " + currentValue + " to 
BLOB/BINARY/VARBINARY/LONGVARBINARY");
                                                 }
                                             }
                                             byte[] dest = new byte[src.length];
@@ -767,7 +767,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                         } else if (currentValue instanceof 
String) {
                                             currentValue = ((String) 
currentValue).getBytes(StandardCharsets.UTF_8);
                                         } else if (currentValue != null && 
!(currentValue instanceof byte[])) {
-                                            throw new 
IllegalTypeConversionException("Cannot convert value " + currentValue + " to 
BLOB/BINARY");
+                                            throw new 
IllegalTypeConversionException("Cannot convert value " + currentValue + " to 
BLOB/BINARY/VARBINARY/LONGVARBINARY");
                                         }
                                     } else {
                                         currentValue = 
DataTypeUtils.convertType(
@@ -868,6 +868,35 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     throw new IOException("Unable to parse data as CLOB/String 
" + value, e.getCause());
                 }
             }
+        } else if (sqlType == Types.VARBINARY || sqlType == 
Types.LONGVARBINARY) {
+            if (fieldSqlType == Types.ARRAY || fieldSqlType == Types.VARCHAR) {
+                if (!(value instanceof byte[])) {
+                    if (value == null) {
+                        try {
+                            ps.setNull(index, Types.BLOB);
+                            return;
+                        } catch (SQLException e) {
+                            throw new IOException("Unable to setNull() on 
prepared statement" , e);
+                        }
+                    } else {
+                        throw new IOException("Expected 
VARBINARY/LONGVARBINARY to be of type byte[] but is instead " + 
value.getClass().getName());
+                    }
+                }
+                byte[] byteArray = (byte[]) value;
+                try {
+                    ps.setBytes(index, byteArray);
+                } catch (SQLException e) {
+                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e.getCause());
+                }
+            } else {
+                byte[] byteArray = new byte[0];
+                try {
+                    byteArray = 
value.toString().getBytes(StandardCharsets.UTF_8);
+                    ps.setBytes(index, byteArray);
+                } catch (SQLException e) {
+                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e.getCause());
+                }
+            }
         } else {
             try {
                 // If the specified field type is OTHER and the SQL type is 
VARCHAR, the conversion went ok as a string literal but try the OTHER type when 
setting the parameter. If an error occurs,
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 8de18420b1..903ba2fc92 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -65,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -95,6 +96,8 @@ public class PutDatabaseRecordTest {
 
     private static final String createUUIDSchema = "CREATE TABLE UUID_TEST (id 
integer primary key, name VARCHAR(100))";
 
+    private static final String createLongVarBinarySchema = "CREATE TABLE 
LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)";
+
     private final static String DB_LOCATION = "target/db_pdr";
 
     TestRunner runner;
@@ -1844,6 +1847,48 @@ public class PutDatabaseRecordTest {
         conn.close();
     }
 
+    @Test
+    void testInsertLongVarBinaryColumn() throws InitializationException, 
ProcessException, SQLException {
+        // Manually create and drop the tables and schemas
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+        stmt.execute(createLongVarBinarySchema);
+
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType());
+
+        byte[] longVarBinaryValue1 = new byte[] {97,98,99};
+        byte[] longVarBinaryValue2 = new byte[] {100,101,102};
+        parser.addRecord(1, longVarBinaryValue1);
+        parser.addRecord(2, longVarBinaryValue2);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "LONGVARBINARY_TEST");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+        ResultSet rs = stmt.executeQuery("SELECT * FROM LONGVARBINARY_TEST");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertArrayEquals(longVarBinaryValue1, rs.getBytes(2));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertArrayEquals(longVarBinaryValue2, rs.getBytes(2));
+        assertFalse(rs.next());
+
+        // Drop the schemas here so as not to interfere with other tests
+        stmt.execute("drop table LONGVARBINARY_TEST");
+        stmt.close();
+        conn.close();
+    }
+
     private void recreateTable() throws ProcessException {
         try (final Connection conn = dbcp.getConnection();
             final Statement stmt = conn.createStatement()) {

Reply via email to