This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 9bd38a2456 NIFI-11691 Support VARBINARY and LONGVARBINARY in
PutDatabaseRecord
9bd38a2456 is described below
commit 9bd38a24563117b9c96fc8a1f2979246cd0c10d0
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Jun 14 15:48:41 2023 -0400
NIFI-11691 Support VARBINARY and LONGVARBINARY in PutDatabaseRecord
This closes #7380
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 760949922c7098d09561af5c2eef7ede055c62bf)
---
.../processors/standard/PutDatabaseRecord.java | 35 +++++++++++++++--
.../processors/standard/PutDatabaseRecordTest.java | 45 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index f66b75e377..671e564d8c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -750,13 +750,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
targetDataType =
DataTypeUtils.getDataTypeFromSQLTypeValue(fieldSqlType);
}
if (targetDataType != null) {
- if (sqlType == Types.BLOB || sqlType ==
Types.BINARY) {
+ if (sqlType == Types.BLOB || sqlType ==
Types.BINARY || sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
if (currentValue instanceof Object[]) {
// Convert Object[Byte] arrays to
byte[]
Object[] src = (Object[])
currentValue;
if (src.length > 0) {
if (!(src[0] instanceof Byte))
{
- throw new
IllegalTypeConversionException("Cannot convert value " + currentValue + " to
BLOB/BINARY");
+ throw new
IllegalTypeConversionException("Cannot convert value " + currentValue + " to
BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
}
byte[] dest = new byte[src.length];
@@ -767,7 +767,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
} else if (currentValue instanceof
String) {
currentValue = ((String)
currentValue).getBytes(StandardCharsets.UTF_8);
} else if (currentValue != null &&
!(currentValue instanceof byte[])) {
- throw new
IllegalTypeConversionException("Cannot convert value " + currentValue + " to
BLOB/BINARY");
+ throw new
IllegalTypeConversionException("Cannot convert value " + currentValue + " to
BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
} else {
currentValue =
DataTypeUtils.convertType(
@@ -868,6 +868,35 @@ public class PutDatabaseRecord extends AbstractProcessor {
throw new IOException("Unable to parse data as CLOB/String
" + value, e.getCause());
}
}
+ } else if (sqlType == Types.VARBINARY || sqlType ==
Types.LONGVARBINARY) {
+ if (fieldSqlType == Types.ARRAY || fieldSqlType == Types.VARCHAR) {
+ if (!(value instanceof byte[])) {
+ if (value == null) {
+ try {
+ ps.setNull(index, Types.BLOB);
+ return;
+ } catch (SQLException e) {
+ throw new IOException("Unable to setNull() on
prepared statement" , e);
+ }
+ } else {
+ throw new IOException("Expected
VARBINARY/LONGVARBINARY to be of type byte[] but is instead " +
value.getClass().getName());
+ }
+ }
+ byte[] byteArray = (byte[]) value;
+ try {
+ ps.setBytes(index, byteArray);
+ } catch (SQLException e) {
+ throw new IOException("Unable to parse binary data with
size" + byteArray.length, e.getCause());
+ }
+ } else {
+ byte[] byteArray = new byte[0];
+ try {
+ byteArray =
value.toString().getBytes(StandardCharsets.UTF_8);
+ ps.setBytes(index, byteArray);
+ } catch (SQLException e) {
+ throw new IOException("Unable to parse binary data with
size" + byteArray.length, e.getCause());
+ }
+ }
} else {
try {
// If the specified field type is OTHER and the SQL type is
VARCHAR, the conversion went ok as a string literal but try the OTHER type when
setting the parameter. If an error occurs,
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 8de18420b1..903ba2fc92 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -65,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -95,6 +96,8 @@ public class PutDatabaseRecordTest {
private static final String createUUIDSchema = "CREATE TABLE UUID_TEST (id
integer primary key, name VARCHAR(100))";
+ private static final String createLongVarBinarySchema = "CREATE TABLE
LONGVARBINARY_TEST (id integer primary key, name LONG VARCHAR FOR BIT DATA)";
+
private final static String DB_LOCATION = "target/db_pdr";
TestRunner runner;
@@ -1844,6 +1847,48 @@ public class PutDatabaseRecordTest {
conn.close();
}
+ @Test
+ void testInsertLongVarBinaryColumn() throws InitializationException,
ProcessException, SQLException {
+ // Manually create and drop the tables and schemas
+ final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement();
+ stmt.execute(createLongVarBinarySchema);
+
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()).getFieldType());
+
+ byte[] longVarBinaryValue1 = new byte[] {97,98,99};
+ byte[] longVarBinaryValue2 = new byte[] {100,101,102};
+ parser.addRecord(1, longVarBinaryValue1);
+ parser.addRecord(2, longVarBinaryValue2);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, "LONGVARBINARY_TEST");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+ ResultSet rs = stmt.executeQuery("SELECT * FROM LONGVARBINARY_TEST");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertArrayEquals(longVarBinaryValue1, rs.getBytes(2));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertArrayEquals(longVarBinaryValue2, rs.getBytes(2));
+ assertFalse(rs.next());
+
+ // Drop the schemas here so as not to interfere with other tests
+ stmt.execute("drop table LONGVARBINARY_TEST");
+ stmt.close();
+ conn.close();
+ }
+
private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {