This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 4009310f04 NIFI-12887 Added Binary String Format property to
PutDatabaseRecord
4009310f04 is described below
commit 4009310f0414719cd9c5f922b5b6330b8247aa3d
Author: tpalfy <[email protected]>
AuthorDate: Tue Mar 12 13:33:01 2024 +0100
NIFI-12887 Added Binary String Format property to PutDatabaseRecord
Add option to treat Strings as hexadecimal character sequences or
base64-encoded binary data when inserting into a binary type column.
This closes #8558
Signed-off-by: David Handermann <[email protected]>
---
.../processors/standard/PutDatabaseRecord.java | 42 +++++++++-
.../processors/standard/PutDatabaseRecordTest.java | 89 +++++++++++++++++++++-
2 files changed, 129 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index fdad3fe56f..ef39491f4a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.io.BaseEncoding;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -238,6 +239,34 @@ public class PutDatabaseRecord extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final AllowableValue BINARY_STRING_FORMAT_UTF8 = new AllowableValue(
+ "UTF-8",
+ "UTF-8",
+ "String values for binary columns contain the original value as
text via UTF-8 character encoding"
+ );
+
+ static final AllowableValue BINARY_STRING_FORMAT_HEX_STRING = new
AllowableValue(
+ "Hexadecimal",
+ "Hexadecimal",
+ "String values for binary columns contain the original value in
hexadecimal format"
+ );
+
+ static final AllowableValue BINARY_STRING_FORMAT_BASE64 = new
AllowableValue(
+ "Base64",
+ "Base64",
+ "String values for binary columns contain the original value in
Base64 encoded format"
+ );
+
+ static final PropertyDescriptor BINARY_STRING_FORMAT = new Builder()
+ .name("put-db-record-binary-format")
+ .displayName("Binary String Format")
+ .description("The format to be applied when decoding string values
to binary.")
+ .required(false)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .allowableValues(BINARY_STRING_FORMAT_UTF8,
BINARY_STRING_FORMAT_HEX_STRING, BINARY_STRING_FORMAT_BASE64)
+ .defaultValue(BINARY_STRING_FORMAT_UTF8.getValue())
+ .build();
+
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new Builder()
.name("put-db-record-translate-field-names")
.displayName("Translate Field Names")
@@ -388,6 +417,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
pds.add(CATALOG_NAME);
pds.add(SCHEMA_NAME);
pds.add(TABLE_NAME);
+ pds.add(BINARY_STRING_FORMAT);
pds.add(TRANSLATE_FIELD_NAMES);
pds.add(UNMATCHED_FIELD_BEHAVIOR);
pds.add(UNMATCHED_COLUMN_BEHAVIOR);
@@ -619,6 +649,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final int timeoutMillis =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final String binaryStringFormat =
context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
+
// Ensure the table name has been set, the generated SQL statements
(and TableSchema cache) will need it
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(format("Cannot process %s
because Table Name is null or empty", flowFile));
@@ -777,7 +809,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
currentValue = dest;
} else if (currentValue instanceof
String) {
- currentValue = ((String)
currentValue).getBytes(StandardCharsets.UTF_8);
+ final String stringValue =
(String) currentValue;
+
+ if
(BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) {
+ currentValue =
BaseEncoding.base64().decode(stringValue);
+ } else if
(BINARY_STRING_FORMAT_HEX_STRING.getValue().equals(binaryStringFormat)) {
+ currentValue =
BaseEncoding.base16().decode(stringValue.toUpperCase());
+ } else {
+ currentValue =
stringValue.getBytes(StandardCharsets.UTF_8);
+ }
} else if (currentValue != null &&
!(currentValue instanceof byte[])) {
throw new
IllegalTypeConversionException("Cannot convert value " + currentValue + " to
BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 107db3f688..3db54ec7e8 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -61,6 +61,7 @@ import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -1685,6 +1686,88 @@ public class PutDatabaseRecordTest {
conn.close();
}
+ @Test
+ void testInsertHexStringIntoBinary() throws Exception {
+ runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT,
PutDatabaseRecord.BINARY_STRING_FORMAT_HEX_STRING);
+
+ String tableName = "HEX_STRING_TEST";
+ String createTable = "CREATE TABLE " + tableName + " (id integer
primary key, binary_data blob)";
+ String hexStringData = "abCDef";
+
+ recreateTable(tableName, createTable);
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("binaryData", RecordFieldType.STRING);
+
+ parser.addRecord(1, hexStringData);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+ final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement();
+
+ final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " +
tableName);
+ assertTrue(resultSet.next());
+
+ assertEquals(1, resultSet.getInt(1));
+
+ Blob blob = resultSet.getBlob(2);
+ assertArrayEquals(new byte[]{(byte)171, (byte)205, (byte)239},
blob.getBytes(1, (int)blob.length()));
+
+ stmt.close();
+ conn.close();
+ }
+
+ @Test
+ void testInsertBase64StringIntoBinary() throws Exception {
+ runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT,
PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
+
+ String tableName = "BASE64_STRING_TEST";
+ String createTable = "CREATE TABLE " + tableName + " (id integer
primary key, binary_data blob)";
+ byte[] binaryData = {(byte) 10, (byte) 103, (byte) 234};
+
+ recreateTable(tableName, createTable);
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("binaryData", RecordFieldType.STRING);
+
+ parser.addRecord(1, Base64.getEncoder().encodeToString(binaryData));
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+ final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement();
+
+ final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " +
tableName);
+ assertTrue(resultSet.next());
+
+ assertEquals(1, resultSet.getInt(1));
+
+ Blob blob = resultSet.getBlob(2);
+ assertArrayEquals(binaryData, blob.getBytes(1, (int)blob.length()));
+
+ stmt.close();
+ conn.close();
+ }
+
@Test
void testInsertWithBlobClobObjectArraySource() throws Exception {
String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary
key, name clob," +
@@ -1958,10 +2041,14 @@ public class PutDatabaseRecordTest {
}
private void recreateTable(String createSQL) throws ProcessException,
SQLException {
+ recreateTable("PERSONS", createSQL);
+ }
+
+ private void recreateTable(String tableName, String createSQL) throws
ProcessException, SQLException {
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
try {
- stmt.execute("drop table PERSONS");
+ stmt.execute("drop table " + tableName);
} catch (SQLException ignore) {
// Do nothing, may not have existed
}