This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 760949922c7098d09561af5c2eef7ede055c62bf Author: Matt Burgess <[email protected]> AuthorDate: Wed Jun 14 15:48:41 2023 -0400 NIFI-11691 Support VARBINARY and LONGVARBINARY in PutDatabaseRecord This closes #7380 Signed-off-by: David Handermann <[email protected]> --- .../processors/standard/PutDatabaseRecord.java | 35 +++++++++++++++-- .../processors/standard/PutDatabaseRecordTest.java | 45 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index f66b75e377..671e564d8c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -750,13 +750,13 @@ public class PutDatabaseRecord extends AbstractProcessor { targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(fieldSqlType); } if (targetDataType != null) { - if (sqlType == Types.BLOB || sqlType == Types.BINARY) { + if (sqlType == Types.BLOB || sqlType == Types.BINARY || sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) { if (currentValue instanceof Object[]) { // Convert Object[Byte] arrays to byte[] Object[] src = (Object[]) currentValue; if (src.length > 0) { if (!(src[0] instanceof Byte)) { - throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY"); + throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY"); } } byte[] dest = new byte[src.length]; @@ -767,7 +767,7 @@ public class PutDatabaseRecord extends AbstractProcessor { } else if (currentValue instanceof String) { currentValue = ((String) currentValue).getBytes(StandardCharsets.UTF_8); } else if (currentValue != null && !(currentValue instanceof byte[])) { - throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY"); + throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY"); } } else { currentValue = DataTypeUtils.convertType( @@ -868,6 +868,35 @@ public class PutDatabaseRecord extends AbstractProcessor { throw new IOException("Unable to parse data as CLOB/String " + value, e.getCause()); } } + } else if (sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) { + if (fieldSqlType == Types.ARRAY || fieldSqlType == Types.VARCHAR) { + if (!(value instanceof byte[])) { + if (value == null) { + try { + ps.setNull(index, Types.BLOB); + return; + } catch (SQLException e) { + throw new IOException("Unable to setNull() on prepared statement" , e); + } + } else { + throw new IOException("Expected VARBINARY/LONGVARBINARY to be of type byte[] but is instead " + value.getClass().getName()); + } + } + byte[] byteArray = (byte[]) value; + try { + ps.setBytes(index, byteArray); + } catch (SQLException e) { + throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause()); + } + } else { + byte[] byteArray = new byte[0]; + try { + byteArray = value.toString().getBytes(StandardCharsets.UTF_8); + ps.setBytes(index, byteArray); + } catch (SQLException e) { + throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause()); + } + } } else { try { // If the specified field type is OTHER and the SQL type is VARCHAR, the conversion went ok as a string literal but try the OTHER type when setting the parameter. If an error occurs, diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index 8de18420b1..903ba2fc92 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -65,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -95,6 +96,8 @@ public class PutDatabaseRecordTest { private static final String createUUIDSchema = "CREATE TABLE UUID_TEST (id integer primary key, name VARCHAR(100))"; + private static final String createLongVarBinarySchema = "CREATE TABLE LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)"; + private final static String DB_LOCATION = "target/db_pdr"; TestRunner runner; @@ -1844,6 +1847,48 @@ public class PutDatabaseRecordTest { conn.close(); } + @Test + void testInsertLongVarBinaryColumn() throws InitializationException, ProcessException, SQLException { + // Manually create and drop the tables and schemas + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + stmt.execute(createLongVarBinarySchema); + + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType()); + + byte[] longVarBinaryValue1 = new byte[] {97,98,99}; + byte[] longVarBinaryValue2 = new byte[] {100,101,102}; + parser.addRecord(1, longVarBinaryValue1); + parser.addRecord(2, longVarBinaryValue2); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "LONGVARBINARY_TEST"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + ResultSet rs = stmt.executeQuery("SELECT * FROM LONGVARBINARY_TEST"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertArrayEquals(longVarBinaryValue1, rs.getBytes(2)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertArrayEquals(longVarBinaryValue2, rs.getBytes(2)); + assertFalse(rs.next()); + + // Drop the schemas here so as not to interfere with other tests + stmt.execute("drop table LONGVARBINARY_TEST"); + stmt.close(); + conn.close(); + } + private void recreateTable() throws ProcessException { try (final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement()) {
