This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 4009310f04 NIFI-12887 Added Binary String Format property to 
PutDatabaseRecord
4009310f04 is described below

commit 4009310f0414719cd9c5f922b5b6330b8247aa3d
Author: tpalfy <tpa...@apache.org>
AuthorDate: Tue Mar 12 13:33:01 2024 +0100

    NIFI-12887 Added Binary String Format property to PutDatabaseRecord
    
    Add option to treat Strings as hexadecimal character sequences or 
base64-encoded binary data when inserting into a binary type column.
    
    This closes #8558
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     | 42 +++++++++-
 .../processors/standard/PutDatabaseRecordTest.java | 89 +++++++++++++++++++++-
 2 files changed, 129 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index fdad3fe56f..ef39491f4a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.io.BaseEncoding;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -238,6 +239,34 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    static final AllowableValue BINARY_STRING_FORMAT_UTF8 = new AllowableValue(
+            "UTF-8",
+            "UTF-8",
+            "String values for binary columns contain the original value as 
text via UTF-8 character encoding"
+    );
+
+    static final AllowableValue BINARY_STRING_FORMAT_HEX_STRING = new 
AllowableValue(
+            "Hexadecimal",
+            "Hexadecimal",
+            "String values for binary columns contain the original value in 
hexadecimal format"
+    );
+
+    static final AllowableValue BINARY_STRING_FORMAT_BASE64 = new 
AllowableValue(
+            "Base64",
+            "Base64",
+            "String values for binary columns contain the original value in 
Base64 encoded format"
+    );
+
+    static final PropertyDescriptor BINARY_STRING_FORMAT = new Builder()
+            .name("put-db-record-binary-format")
+            .displayName("Binary String Format")
+            .description("The format to be applied when decoding string values 
to binary.")
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .allowableValues(BINARY_STRING_FORMAT_UTF8, 
BINARY_STRING_FORMAT_HEX_STRING, BINARY_STRING_FORMAT_BASE64)
+            .defaultValue(BINARY_STRING_FORMAT_UTF8.getValue())
+            .build();
+
     static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new Builder()
             .name("put-db-record-translate-field-names")
             .displayName("Translate Field Names")
@@ -388,6 +417,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         pds.add(CATALOG_NAME);
         pds.add(SCHEMA_NAME);
         pds.add(TABLE_NAME);
+        pds.add(BINARY_STRING_FORMAT);
         pds.add(TRANSLATE_FIELD_NAMES);
         pds.add(UNMATCHED_FIELD_BEHAVIOR);
         pds.add(UNMATCHED_COLUMN_BEHAVIOR);
@@ -619,6 +649,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
         final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
         final int timeoutMillis = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
+        final String binaryStringFormat = 
context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
+
         // Ensure the table name has been set, the generated SQL statements 
(and TableSchema cache) will need it
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException(format("Cannot process %s 
because Table Name is null or empty", flowFile));
@@ -777,7 +809,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                             }
                                             currentValue = dest;
                                         } else if (currentValue instanceof 
String) {
-                                            currentValue = ((String) 
currentValue).getBytes(StandardCharsets.UTF_8);
+                                            final String stringValue = 
(String) currentValue;
+
+                                            if 
(BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) {
+                                                currentValue = 
BaseEncoding.base64().decode(stringValue);
+                                            } else if 
(BINARY_STRING_FORMAT_HEX_STRING.getValue().equals(binaryStringFormat)) {
+                                                currentValue = 
BaseEncoding.base16().decode(stringValue.toUpperCase());
+                                            } else {
+                                                currentValue = 
stringValue.getBytes(StandardCharsets.UTF_8);
+                                            }
                                         } else if (currentValue != null && 
!(currentValue instanceof byte[])) {
                                             throw new 
IllegalTypeConversionException("Cannot convert value " + currentValue + " to 
BLOB/BINARY/VARBINARY/LONGVARBINARY");
                                         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 107db3f688..3db54ec7e8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -61,6 +61,7 @@ import java.time.LocalDate;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1685,6 +1686,88 @@ public class PutDatabaseRecordTest {
         conn.close();
     }
 
+    @Test
+    void testInsertHexStringIntoBinary() throws Exception {
+        runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, 
PutDatabaseRecord.BINARY_STRING_FORMAT_HEX_STRING);
+
+        String tableName = "HEX_STRING_TEST";
+        String createTable = "CREATE TABLE " + tableName + " (id integer 
primary key, binary_data blob)";
+        String hexStringData = "abCDef";
+
+        recreateTable(tableName, createTable);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("binaryData", RecordFieldType.STRING);
+
+        parser.addRecord(1, hexStringData);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+
+        final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + 
tableName);
+        assertTrue(resultSet.next());
+
+        assertEquals(1, resultSet.getInt(1));
+
+        Blob blob = resultSet.getBlob(2);
+        assertArrayEquals(new byte[]{(byte)171, (byte)205, (byte)239}, 
blob.getBytes(1, (int)blob.length()));
+
+        stmt.close();
+        conn.close();
+    }
+
+    @Test
+    void testInsertBase64StringIntoBinary() throws Exception {
+        runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, 
PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
+
+        String tableName = "BASE64_STRING_TEST";
+        String createTable = "CREATE TABLE " + tableName + " (id integer 
primary key, binary_data blob)";
+        byte[] binaryData = {(byte) 10, (byte) 103, (byte) 234};
+
+        recreateTable(tableName, createTable);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("binaryData", RecordFieldType.STRING);
+
+        parser.addRecord(1, Base64.getEncoder().encodeToString(binaryData));
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+
+        final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + 
tableName);
+        assertTrue(resultSet.next());
+
+        assertEquals(1, resultSet.getInt(1));
+
+        Blob blob = resultSet.getBlob(2);
+        assertArrayEquals(binaryData, blob.getBytes(1, (int)blob.length()));
+
+        stmt.close();
+        conn.close();
+    }
+
     @Test
     void testInsertWithBlobClobObjectArraySource() throws Exception {
         String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary 
key, name clob," +
@@ -1958,10 +2041,14 @@ public class PutDatabaseRecordTest {
     }
 
     private void recreateTable(String createSQL) throws ProcessException, 
SQLException {
+        recreateTable("PERSONS", createSQL);
+    }
+
+    private void recreateTable(String tableName, String createSQL) throws 
ProcessException, SQLException {
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
         try {
-            stmt.execute("drop table PERSONS");
+            stmt.execute("drop table " + tableName);
         } catch (SQLException ignore) {
             // Do nothing, may not have existed
         }

Reply via email to