This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 96f9443604 NIFI-14246: Make add-list-element-records property 
configurable in ParquetReader
96f9443604 is described below

commit 96f9443604c95f101521a762860c46eeb7c0aeef
Author: Mark Bathori <[email protected]>
AuthorDate: Fri Feb 7 12:02:23 2025 +0100

    NIFI-14246: Make add-list-element-records property configurable in 
ParquetReader
    
    This closes #9700.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../org/apache/nifi/parquet/ParquetReader.java     |   6 ++--
 .../apache/nifi/parquet/TestParquetProcessor.java  |  28 ++++++++++++++-
 .../org/apache/nifi/parquet/TestParquetReader.java |  39 +++++++++++++++++++++
 .../resources/TestParquetReaderWithArray.parquet   | Bin 0 -> 1082 bytes
 4 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
index 24d8b65525..fb482dff02 100644
--- 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
+++ 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.parquet;
 
+import static 
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS;
+import static 
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_READ_COMPATIBILITY;
 import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
 import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
 
@@ -31,7 +33,6 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.parquet.record.ParquetRecordReader;
 import org.apache.nifi.parquet.utils.ParquetConfig;
-import org.apache.nifi.parquet.utils.ParquetUtils;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 
@@ -41,7 +42,8 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 public class ParquetReader extends AbstractControllerService implements 
RecordReaderFactory {
 
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
-            ParquetUtils.AVRO_READ_COMPATIBILITY
+            AVRO_READ_COMPATIBILITY,
+            AVRO_ADD_LIST_ELEMENT_RECORDS
     );
 
     @Override
diff --git 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
index abb03876d7..acef833843 100644
--- 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
+++ 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -60,7 +61,7 @@ public class TestParquetProcessor extends AbstractProcessor {
              final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
             Record record;
             while ((record = reader.nextRecord()) != null) {
-                records.add(record.toString());
+                records.add(serializeRecord(record));
             }
         } catch (Exception e) {
             throw new ProcessException(e);
@@ -81,4 +82,29 @@ public class TestParquetProcessor extends AbstractProcessor {
         return new HashSet<>(singletonList(SUCCESS));
     }
 
+    private String serializeRecord(Record record) {
+        final List<String> result = new ArrayList<>();
+        for (Map.Entry<String, Object> entry : record.toMap().entrySet()) {
+            result.add(entry.getKey() + "=" + 
serializeField(record.getValue(entry.getKey())));
+        }
+
+        return "MapRecord[{" + String.join(", ", result) + "}]";
+    }
+
+    private String serializeField(Object value) {
+        final StringBuilder result = new StringBuilder();
+        if (value instanceof Object[]) {
+            final List<String> array = new ArrayList<>();
+            for (Object arrayValue : (Object[]) value) {
+                array.add(serializeField(arrayValue));
+            }
+            result.append("[").append(String.join(", ", array)).append("]");
+        } else if (value instanceof Record) {
+            result.append(serializeRecord((Record) value));
+        } else {
+            result.append(value);
+        }
+
+        return result.toString();
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index c96bdf19e0..f862bfe78c 100644
--- 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++ 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -18,6 +18,7 @@ package org.apache.nifi.parquet;
 
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toMap;
+import static 
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.io.ByteArrayInputStream;
@@ -174,6 +175,44 @@ public class TestParquetReader {
                 "MapRecord[{name=Bob7, favorite_number=7, 
favorite_color=blue7}]");
     }
 
+    @Test
+    public void testReaderWithAddListElementRecordsEnabled() throws 
IOException, InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
+        final ParquetReader parquetReader = new ParquetReader();
+
+        runner.addControllerService("reader", parquetReader);
+        runner.setProperty(parquetReader, AVRO_ADD_LIST_ELEMENT_RECORDS, 
"true");
+        runner.enableControllerService(parquetReader);
+
+        
runner.enqueue(Paths.get("src/test/resources/TestParquetReaderWithArray.parquet"));
+
+        runner.setProperty(TestParquetProcessor.READER, "reader");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).getFirst().assertContentEquals(
+                "MapRecord[{name=Bob, favorite_number=1, 
favorite_colors=[MapRecord[{element=blue}], MapRecord[{element=red}], 
MapRecord[{element=yellow}]]}]");
+    }
+
+    @Test
+    public void testReaderWithAddListElementRecordsDisabled() throws 
IOException, InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
+        final ParquetReader parquetReader = new ParquetReader();
+
+        runner.addControllerService("reader", parquetReader);
+        runner.setProperty(parquetReader, AVRO_ADD_LIST_ELEMENT_RECORDS, 
"false");
+        runner.enableControllerService(parquetReader);
+
+        
runner.enqueue(Paths.get("src/test/resources/TestParquetReaderWithArray.parquet"));
+
+        runner.setProperty(TestParquetProcessor.READER, "reader");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).getFirst().assertContentEquals(
+                "MapRecord[{name=Bob, favorite_number=1, 
favorite_colors=[blue, red, yellow]}]");
+    }
+
     private List<Record> getRecords(File parquetFile, Map<String, String> 
variables)
             throws IOException, MalformedRecordException {
         final List<Record> results = new ArrayList<>();
diff --git 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReaderWithArray.parquet
 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReaderWithArray.parquet
new file mode 100644
index 0000000000..236d61ddd6
Binary files /dev/null and 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReaderWithArray.parquet
 differ

Reply via email to