This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 10f1ab2396 NIFI-14297 Added support to Record Schema for Bytes to
Decimal Logical Type conversion (#9755)
10f1ab2396 is described below
commit 10f1ab239630867387975512f4630387dbbc100d
Author: dan-s1 <[email protected]>
AuthorDate: Tue Mar 4 16:16:11 2025 -0500
NIFI-14297 Added support to Record Schema for Bytes to Decimal Logical Type
conversion (#9755)
- Added support for default value as string in Avro Schema with bytes to
decimal Logical Type
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../serialization/record/util/DataTypeUtils.java | 5 ++
.../serialization/record/TestDataTypeUtils.java | 50 +++++++++------
.../processors/standard/TestConvertRecord.java | 72 ++++++++++++++++++++++
3 files changed, 109 insertions(+), 18 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 43b8e5ce8d..0e7b0c9b91 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1282,6 +1282,10 @@ public class DataTypeUtils {
}
}
+ case byte[] byteArray -> {
+ final String byteString = new String(byteArray,
StandardCharsets.UTF_8);
+ return new BigDecimal(byteString);
+ }
default -> {
}
}
@@ -1318,6 +1322,7 @@ public class DataTypeUtils {
case null -> false;
case Number ignored -> true;
case String s -> stringPredicate.test(s);
+ case byte[] bytes -> stringPredicate.test(new String(bytes,
StandardCharsets.UTF_8));
default -> false;
};
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index cb3074453a..e54ba95f8a 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -24,6 +24,9 @@ import
org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -587,24 +590,35 @@ public class TestDataTypeUtils {
assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(3, 1),
DataTypeUtils.inferDataType(BigDecimal.valueOf(12.3D), null));
}
- @Test
- public void testIsBigDecimalTypeCompatible() {
- assertTrue(DataTypeUtils.isDecimalTypeCompatible((byte) 13));
- assertTrue(DataTypeUtils.isDecimalTypeCompatible((short) 13));
- assertTrue(DataTypeUtils.isDecimalTypeCompatible(12));
- assertTrue(DataTypeUtils.isDecimalTypeCompatible(12L));
-
assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigInteger.valueOf(12L)));
- assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123F));
- assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123D));
-
assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigDecimal.valueOf(12.123D)));
- assertTrue(DataTypeUtils.isDecimalTypeCompatible("123"));
-
- assertFalse(DataTypeUtils.isDecimalTypeCompatible(null));
- assertFalse(DataTypeUtils.isDecimalTypeCompatible("test"));
- assertFalse(DataTypeUtils.isDecimalTypeCompatible(new ArrayList<>()));
- // Decimal handling does not support NaN and Infinity as the
underlying BigDecimal is unable to parse
- assertFalse(DataTypeUtils.isDecimalTypeCompatible("NaN"));
- assertFalse(DataTypeUtils.isDecimalTypeCompatible("Infinity"));
+ @ParameterizedTest
+ @MethodSource("decimalTypeCompatibleData")
+ public void testIsBigDecimalTypeCompatible(Object value, boolean
compatible) {
+ if (compatible) {
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(value));
+ } else {
+ assertFalse(DataTypeUtils.isDecimalTypeCompatible(value));
+ }
+ }
+
+ private static Stream<Arguments> decimalTypeCompatibleData() {
+ return Stream.of(
+ Arguments.argumentSet("byte", (byte) 13, true),
+ Arguments.argumentSet("short", (short) 13, true),
+ Arguments.argumentSet("int", 12, true),
+ Arguments.argumentSet("long", 12L, true),
+ Arguments.argumentSet("BigInteger", BigInteger.valueOf(12L),
true),
+ Arguments.argumentSet("float", 12.123F, true),
+ Arguments.argumentSet("double", 12.123D, true),
+ Arguments.argumentSet("BigDecimal",
BigDecimal.valueOf(12.123D), true),
+ Arguments.argumentSet("integer String", "123", true),
+ Arguments.argumentSet("double as byte array",
"12.00".getBytes(StandardCharsets.UTF_8), true),
+ Arguments.argumentSet("null", null, false),
+ Arguments.argumentSet("non number String", "test", false),
+ Arguments.argumentSet("ArrayList", new ArrayList<>(), false),
+ // Decimal handling does not support NaN and Infinity as the
underlying BigDecimal is unable to parse
+ Arguments.argumentSet("Nan", "NaN", false),
+ Arguments.argumentSet("Infinity", "Infinity", false)
+ );
}
@Test
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 311c209509..5b6c2e38c4 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -436,4 +436,76 @@ public class TestConvertRecord {
final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
assertFalse(flowFile.getContent().contains("fieldThatShouldBeRemoved"));
}
+
+ @Test
+ public void testJSONWithDefinedFieldTypeBytesAndLogicalTypeDecimal()
throws Exception {
+ final String schema = """
+ {
+ "type": "record",
+ "name": "ExampleRecord",
+ "fields": [
+ {
+ "name": "big_decimal_field",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 10,
+ "scale": 4
+ },
+ "default": "0.0000"
+ }, {
+ "name": "default_field",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 10,
+ "scale": 4
+ },
+ "default": "0.0001"
+ }
+ ]
+ }
+ """;
+
+ final String inputContent = """
+ [
+ {
+ "big_decimal_field": 14000.5833
+ }
+ ]
+ """;
+
+ final String expectedContent = """
+ [
+ {
+ "big_decimal_field": 14000.5833,
+ "default_field": 0.0001
+ }
+ ]
+ """.replaceAll("\\s", "");
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService(READER_ID, jsonReader);
+ runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService(WRITER_ID, jsonWriter);
+ runner.setProperty(jsonWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, schema);
+ runner.setProperty(jsonWriter, "Schema Write Strategy",
"full-schema-attribute");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
+ runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
+ runner.enqueue(inputContent);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
+ flowFile.assertContentEquals(expectedContent);
+ }
}