This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 10f1ab2396 NIFI-14297 Added support to Record Schema for Bytes to 
Decimal Logical Type conversion (#9755)
10f1ab2396 is described below

commit 10f1ab239630867387975512f4630387dbbc100d
Author: dan-s1 <[email protected]>
AuthorDate: Tue Mar 4 16:16:11 2025 -0500

    NIFI-14297 Added support to Record Schema for Bytes to Decimal Logical Type 
conversion (#9755)
    
    - Added support for default value as string in Avro Schema with bytes to 
decimal Logical Type
    
    Co-authored-by: David Handermann <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../serialization/record/util/DataTypeUtils.java   |  5 ++
 .../serialization/record/TestDataTypeUtils.java    | 50 +++++++++------
 .../processors/standard/TestConvertRecord.java     | 72 ++++++++++++++++++++++
 3 files changed, 109 insertions(+), 18 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 43b8e5ce8d..0e7b0c9b91 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1282,6 +1282,10 @@ public class DataTypeUtils {
                 }
 
             }
+            case byte[] byteArray -> {
+                final String byteString = new String(byteArray, 
StandardCharsets.UTF_8);
+                return new BigDecimal(byteString);
+            }
             default -> {
             }
         }
@@ -1318,6 +1322,7 @@ public class DataTypeUtils {
             case null -> false;
             case Number ignored -> true;
             case String s -> stringPredicate.test(s);
+            case byte[] bytes -> stringPredicate.test(new String(bytes, 
StandardCharsets.UTF_8));
             default -> false;
         };
 
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index cb3074453a..e54ba95f8a 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -24,6 +24,9 @@ import 
org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -587,24 +590,35 @@ public class TestDataTypeUtils {
         assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(3, 1), 
DataTypeUtils.inferDataType(BigDecimal.valueOf(12.3D), null));
     }
 
-    @Test
-    public void testIsBigDecimalTypeCompatible() {
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible((byte) 13));
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible((short) 13));
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12));
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12L));
-        
assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigInteger.valueOf(12L)));
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123F));
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123D));
-        
assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigDecimal.valueOf(12.123D)));
-        assertTrue(DataTypeUtils.isDecimalTypeCompatible("123"));
-
-        assertFalse(DataTypeUtils.isDecimalTypeCompatible(null));
-        assertFalse(DataTypeUtils.isDecimalTypeCompatible("test"));
-        assertFalse(DataTypeUtils.isDecimalTypeCompatible(new ArrayList<>()));
-        // Decimal handling does not support NaN and Infinity as the 
underlying BigDecimal is unable to parse
-        assertFalse(DataTypeUtils.isDecimalTypeCompatible("NaN"));
-        assertFalse(DataTypeUtils.isDecimalTypeCompatible("Infinity"));
+    @ParameterizedTest
+    @MethodSource("decimalTypeCompatibleData")
+    public void testIsBigDecimalTypeCompatible(Object value, boolean 
compatible) {
+        if (compatible) {
+            assertTrue(DataTypeUtils.isDecimalTypeCompatible(value));
+        } else {
+            assertFalse(DataTypeUtils.isDecimalTypeCompatible(value));
+        }
+    }
+
+    private static Stream<Arguments> decimalTypeCompatibleData() {
+        return Stream.of(
+                Arguments.argumentSet("byte", (byte) 13, true),
+                Arguments.argumentSet("short", (short) 13, true),
+                Arguments.argumentSet("int", 12, true),
+                Arguments.argumentSet("long", 12L, true),
+                Arguments.argumentSet("BigInteger", BigInteger.valueOf(12L), 
true),
+                Arguments.argumentSet("float", 12.123F, true),
+                Arguments.argumentSet("double", 12.123D, true),
+                Arguments.argumentSet("BigDecimal", 
BigDecimal.valueOf(12.123D), true),
+                Arguments.argumentSet("integer String", "123", true),
+                Arguments.argumentSet("double as byte array", 
"12.00".getBytes(StandardCharsets.UTF_8), true),
+                Arguments.argumentSet("null", null, false),
+                Arguments.argumentSet("non number String", "test", false),
+                Arguments.argumentSet("ArrayList", new ArrayList<>(), false),
+                // Decimal handling does not support NaN and Infinity as the 
underlying BigDecimal is unable to parse
+                Arguments.argumentSet("Nan", "NaN", false),
+                Arguments.argumentSet("Infinity", "Infinity", false)
+        );
     }
 
     @Test
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 311c209509..5b6c2e38c4 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -436,4 +436,76 @@ public class TestConvertRecord {
         final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
         
assertFalse(flowFile.getContent().contains("fieldThatShouldBeRemoved"));
     }
+
+    @Test
+    public void testJSONWithDefinedFieldTypeBytesAndLogicalTypeDecimal() 
throws Exception {
+        final String schema = """
+                {
+                  "type": "record",
+                  "name": "ExampleRecord",
+                  "fields": [
+                    {
+                      "name": "big_decimal_field",
+                      "type": {
+                        "type": "bytes",
+                        "logicalType": "decimal",
+                        "precision": 10,
+                        "scale": 4
+                      },
+                      "default": "0.0000"
+                    }, {
+                      "name": "default_field",
+                      "type": {
+                        "type": "bytes",
+                        "logicalType": "decimal",
+                        "precision": 10,
+                        "scale": 4
+                      },
+                      "default": "0.0001"
+                    }
+                  ]
+                }
+                """;
+
+        final String inputContent = """
+                [
+                  {
+                    "big_decimal_field": 14000.5833
+                  }
+                ]
+                """;
+
+        final String expectedContent = """
+                [
+                  {
+                    "big_decimal_field": 14000.5833,
+                    "default_field": 0.0001
+                  }
+                ]
+                """.replaceAll("\\s", "");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService(READER_ID, jsonReader);
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService(WRITER_ID, jsonWriter);
+        runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, schema);
+        runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+
+        runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
+        runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
+        runner.enqueue(inputContent);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
+        flowFile.assertContentEquals(expectedContent);
+    }
 }

Reply via email to