This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 96f9443604 NIFI-14246: Make add-list-element-records property
configurable in ParquetReader
96f9443604 is described below
commit 96f9443604c95f101521a762860c46eeb7c0aeef
Author: Mark Bathori <[email protected]>
AuthorDate: Fri Feb 7 12:02:23 2025 +0100
NIFI-14246: Make add-list-element-records property configurable in
ParquetReader
This closes #9700.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../org/apache/nifi/parquet/ParquetReader.java | 6 ++--
.../apache/nifi/parquet/TestParquetProcessor.java | 28 ++++++++++++++-
.../org/apache/nifi/parquet/TestParquetReader.java | 39 +++++++++++++++++++++
.../resources/TestParquetReaderWithArray.parquet | Bin 0 -> 1082 bytes
4 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
index 24d8b65525..fb482dff02 100644
---
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
+++
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.parquet;
+import static
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS;
+import static
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_READ_COMPATIBILITY;
import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
@@ -31,7 +33,6 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.record.ParquetRecordReader;
import org.apache.nifi.parquet.utils.ParquetConfig;
-import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -41,7 +42,8 @@ import org.apache.nifi.serialization.RecordReaderFactory;
public class ParquetReader extends AbstractControllerService implements
RecordReaderFactory {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
- ParquetUtils.AVRO_READ_COMPATIBILITY
+ AVRO_READ_COMPATIBILITY,
+ AVRO_ADD_LIST_ELEMENT_RECORDS
);
@Override
diff --git
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
index abb03876d7..acef833843 100644
---
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
+++
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@@ -60,7 +61,7 @@ public class TestParquetProcessor extends AbstractProcessor {
final RecordReader reader =
readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
- records.add(record.toString());
+ records.add(serializeRecord(record));
}
} catch (Exception e) {
throw new ProcessException(e);
@@ -81,4 +82,29 @@ public class TestParquetProcessor extends AbstractProcessor {
return new HashSet<>(singletonList(SUCCESS));
}
+ private String serializeRecord(Record record) {
+ final List<String> result = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : record.toMap().entrySet()) {
+ result.add(entry.getKey() + "=" +
serializeField(record.getValue(entry.getKey())));
+ }
+
+ return "MapRecord[{" + String.join(", ", result) + "}]";
+ }
+
+ private String serializeField(Object value) {
+ final StringBuilder result = new StringBuilder();
+ if (value instanceof Object[]) {
+ final List<String> array = new ArrayList<>();
+ for (Object arrayValue : (Object[]) value) {
+ array.add(serializeField(arrayValue));
+ }
+ result.append("[").append(String.join(", ", array)).append("]");
+ } else if (value instanceof Record) {
+ result.append(serializeRecord((Record) value));
+ } else {
+ result.append(value);
+ }
+
+ return result.toString();
+ }
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index c96bdf19e0..f862bfe78c 100644
---
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -18,6 +18,7 @@ package org.apache.nifi.parquet;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.ByteArrayInputStream;
@@ -174,6 +175,44 @@ public class TestParquetReader {
"MapRecord[{name=Bob7, favorite_number=7,
favorite_color=blue7}]");
}
+ @Test
+ public void testReaderWithAddListElementRecordsEnabled() throws
IOException, InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
+ final ParquetReader parquetReader = new ParquetReader();
+
+ runner.addControllerService("reader", parquetReader);
+ runner.setProperty(parquetReader, AVRO_ADD_LIST_ELEMENT_RECORDS,
"true");
+ runner.enableControllerService(parquetReader);
+
+
runner.enqueue(Paths.get("src/test/resources/TestParquetReaderWithArray.parquet"));
+
+ runner.setProperty(TestParquetProcessor.READER, "reader");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).getFirst().assertContentEquals(
+ "MapRecord[{name=Bob, favorite_number=1,
favorite_colors=[MapRecord[{element=blue}], MapRecord[{element=red}],
MapRecord[{element=yellow}]]}]");
+ }
+
+ @Test
+ public void testReaderWithAddListElementRecordsDisabled() throws
IOException, InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
+ final ParquetReader parquetReader = new ParquetReader();
+
+ runner.addControllerService("reader", parquetReader);
+ runner.setProperty(parquetReader, AVRO_ADD_LIST_ELEMENT_RECORDS,
"false");
+ runner.enableControllerService(parquetReader);
+
+
runner.enqueue(Paths.get("src/test/resources/TestParquetReaderWithArray.parquet"));
+
+ runner.setProperty(TestParquetProcessor.READER, "reader");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).getFirst().assertContentEquals(
+ "MapRecord[{name=Bob, favorite_number=1,
favorite_colors=[blue, red, yellow]}]");
+ }
+
private List<Record> getRecords(File parquetFile, Map<String, String>
variables)
throws IOException, MalformedRecordException {
final List<Record> results = new ArrayList<>();
diff --git
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReaderWithArray.parquet
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReaderWithArray.parquet
new file mode 100644
index 0000000000..236d61ddd6
Binary files /dev/null and
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReaderWithArray.parquet
differ