This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 4009310f04 NIFI-12887 Added Binary String Format property to PutDatabaseRecord 4009310f04 is described below commit 4009310f0414719cd9c5f922b5b6330b8247aa3d Author: tpalfy <tpa...@apache.org> AuthorDate: Tue Mar 12 13:33:01 2024 +0100 NIFI-12887 Added Binary String Format property to PutDatabaseRecord Add option to treat Strings as hexadecimal character sequences or base64-encoded binary data when inserting into a binary type column. This closes #8558 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../processors/standard/PutDatabaseRecord.java | 42 +++++++++- .../processors/standard/PutDatabaseRecordTest.java | 89 +++++++++++++++++++++- 2 files changed, 129 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index fdad3fe56f..ef39491f4a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.io.BaseEncoding; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -238,6 +239,34 @@ public class PutDatabaseRecord extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final AllowableValue BINARY_STRING_FORMAT_UTF8 = new AllowableValue( + "UTF-8", + "UTF-8", + "String values for binary columns contain the original value as text via UTF-8 character encoding" + ); + + static final AllowableValue BINARY_STRING_FORMAT_HEX_STRING = new AllowableValue( + "Hexadecimal", + "Hexadecimal", + "String values for binary columns contain the original value in hexadecimal format" + ); + + static final AllowableValue BINARY_STRING_FORMAT_BASE64 = new AllowableValue( + "Base64", + "Base64", + "String values for binary columns contain the original value in Base64 encoded format" + ); + + static final PropertyDescriptor BINARY_STRING_FORMAT = new Builder() + .name("put-db-record-binary-format") + .displayName("Binary String Format") + .description("The format to be applied when decoding string values to binary.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .allowableValues(BINARY_STRING_FORMAT_UTF8, BINARY_STRING_FORMAT_HEX_STRING, BINARY_STRING_FORMAT_BASE64) + .defaultValue(BINARY_STRING_FORMAT_UTF8.getValue()) + .build(); + static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new Builder() .name("put-db-record-translate-field-names") .displayName("Translate Field Names") @@ -388,6 +417,7 @@ public class PutDatabaseRecord extends AbstractProcessor { pds.add(CATALOG_NAME); pds.add(SCHEMA_NAME); pds.add(TABLE_NAME); + pds.add(BINARY_STRING_FORMAT); pds.add(TRANSLATE_FIELD_NAMES); pds.add(UNMATCHED_FIELD_BEHAVIOR); pds.add(UNMATCHED_COLUMN_BEHAVIOR); @@ -619,6 +649,8 @@ public class PutDatabaseRecord extends AbstractProcessor { final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final String binaryStringFormat = context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue(); + // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile)); @@ -777,7 +809,15 @@ public class PutDatabaseRecord extends AbstractProcessor { } currentValue = dest; } else if (currentValue instanceof String) { - currentValue = ((String) currentValue).getBytes(StandardCharsets.UTF_8); + final String stringValue = (String) currentValue; + + if (BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) { + currentValue = BaseEncoding.base64().decode(stringValue); + } else if (BINARY_STRING_FORMAT_HEX_STRING.getValue().equals(binaryStringFormat)) { + currentValue = BaseEncoding.base16().decode(stringValue.toUpperCase()); + } else { + currentValue = stringValue.getBytes(StandardCharsets.UTF_8); + } } else if (currentValue != null && !(currentValue instanceof byte[])) { throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY"); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index 107db3f688..3db54ec7e8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -61,6 +61,7 @@ import java.time.LocalDate; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1685,6 +1686,88 @@ public class PutDatabaseRecordTest { conn.close(); } + @Test + void testInsertHexStringIntoBinary() throws Exception { + runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_HEX_STRING); + + String tableName = "HEX_STRING_TEST"; + String createTable = "CREATE TABLE " + tableName + " (id integer primary key, binary_data blob)"; + String hexStringData = "abCDef"; + + recreateTable(tableName, createTable); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("binaryData", RecordFieldType.STRING); + + parser.addRecord(1, hexStringData); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + + final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(resultSet.next()); + + assertEquals(1, resultSet.getInt(1)); + + Blob blob = resultSet.getBlob(2); + assertArrayEquals(new byte[]{(byte)171, (byte)205, (byte)239}, blob.getBytes(1, (int)blob.length())); + + stmt.close(); + conn.close(); + } + + @Test + void testInsertBase64StringIntoBinary() throws Exception { + runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64); + + String tableName = "BASE64_STRING_TEST"; + String createTable = "CREATE TABLE " + tableName + " (id integer primary key, binary_data blob)"; + byte[] binaryData = {(byte) 10, (byte) 103, (byte) 234}; + + recreateTable(tableName, createTable); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("binaryData", RecordFieldType.STRING); + + parser.addRecord(1, Base64.getEncoder().encodeToString(binaryData)); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + + final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(resultSet.next()); + + assertEquals(1, resultSet.getInt(1)); + + Blob blob = resultSet.getBlob(2); + assertArrayEquals(binaryData, blob.getBytes(1, (int)blob.length())); + + stmt.close(); + conn.close(); + } + @Test void testInsertWithBlobClobObjectArraySource() throws Exception { String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," + @@ -1958,10 +2041,14 @@ public class PutDatabaseRecordTest { } private void recreateTable(String createSQL) throws ProcessException, SQLException { + recreateTable("PERSONS", createSQL); + } + + private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); try { - stmt.execute("drop table PERSONS"); + stmt.execute("drop table " + tableName); } catch (SQLException ignore) { // Do nothing, may not have existed }