This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f119c49c4d NIFI-12843: Fix incorrect read of parquet data, when
record.count is inherited
f119c49c4d is described below
commit f119c49c4d649e97015a442da26af1c0bcfe16b0
Author: Rajmund Takacs <[email protected]>
AuthorDate: Mon Feb 26 16:52:59 2024 +0100
NIFI-12843: Fix incorrect read of parquet data, when record.count is
inherited
This closes #8452.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi/parquet/record/ParquetRecordReader.java | 9 ++-
.../org/apache/nifi/parquet/TestParquetReader.java | 90 ----------------------
2 files changed, 6 insertions(+), 93 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
index 380081a3b0..0ed4680c4b 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
@@ -61,10 +61,13 @@ public class ParquetRecordReader implements RecordReader {
final Long offset =
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_OFFSET))
.map(Long::parseLong)
.orElse(null);
+ final String recordCount =
variables.get(ParquetAttribute.RECORD_COUNT);
- recordsToRead =
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_COUNT))
- .map(Long::parseLong)
- .orElse(null);
+ if (offset != null && recordCount != null) {
+ recordsToRead = Long.parseLong(recordCount);
+ } else {
+ recordsToRead = null;
+ }
final long fileStartOffset =
Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_START_OFFSET))
.map(Long::parseLong)
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index 1b50d131d0..2a22f75345 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -17,7 +17,6 @@
package org.apache.nifi.parquet;
import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -79,30 +78,6 @@ public class TestParquetReader {
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i),
convertRecordToUser(results.get(i))));
}
- @Test
- public void testReadUsersPartiallyWithLimitedRecordCount() throws
IOException, MalformedRecordException {
- final int numUsers = 25;
- final int expectedRecords = 3;
- final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
- final List<Record> results = getRecords(parquetFile,
singletonMap(ParquetAttribute.RECORD_COUNT, "3"));
-
- assertEquals(expectedRecords, results.size());
- IntStream.range(0, expectedRecords)
- .forEach(i -> assertEquals(ParquetTestUtils.createUser(i),
convertRecordToUser(results.get(i))));
- }
-
- @Test
- public void testReadUsersPartiallyWithOffset() throws IOException,
MalformedRecordException {
- final int numUsers = 1000025; // intentionally so large, to test input
with many record groups
- final int expectedRecords = 5;
- final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
- final List<Record> results = getRecords(parquetFile,
singletonMap(ParquetAttribute.RECORD_OFFSET, "1000020"));
-
- assertEquals(expectedRecords, results.size());
- IntStream.range(0, expectedRecords)
- .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
1000020), convertRecordToUser(results.get(i))));
- }
-
@Test
public void testReadUsersPartiallyWithOffsetAndLimitedRecordCount() throws
IOException, MalformedRecordException {
final int numUsers = 1000025; // intentionally so large, to test input
with many record groups
@@ -120,28 +95,6 @@ public class TestParquetReader {
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
1000020), convertRecordToUser(results.get(i))));
}
- @Test
- public void testReadUsersPartiallyWithLimitedRecordCountWithinFileRange()
- throws IOException, MalformedRecordException {
- final int numUsers = 1000;
- final int expectedRecords = 3;
- final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
- final List<Record> results = getRecords(
- parquetFile,
- new HashMap<String, String>() {
- {
- put(ParquetAttribute.RECORD_COUNT, "3");
- put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
- put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
- }
- }
- );
-
- assertEquals(expectedRecords, results.size());
- IntStream.range(0, expectedRecords)
- .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
663), convertRecordToUser(results.get(i))));
- }
-
@Test
public void testReadUsersPartiallyWithOffsetWithinFileRange() throws
IOException, MalformedRecordException {
final int numUsers = 1000;
@@ -213,25 +166,6 @@ public class TestParquetReader {
"MapRecord[{name=Bob9, favorite_number=9,
favorite_color=blue9}]");
}
- @Test
- public void testPartialReaderWithLimitedRecordCount() throws
InitializationException, IOException {
- final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
- final ParquetReader parquetReader = new ParquetReader();
-
- runner.addControllerService("reader", parquetReader);
- runner.enableControllerService(parquetReader);
-
- runner.enqueue(Paths.get(PARQUET_PATH),
singletonMap(ParquetAttribute.RECORD_COUNT, "2"));
-
- runner.setProperty(TestParquetProcessor.READER, "reader");
-
- runner.run();
- runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
-
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
- "MapRecord[{name=Bob0, favorite_number=0,
favorite_color=blue0}]\n" +
- "MapRecord[{name=Bob1, favorite_number=1,
favorite_color=blue1}]");
- }
-
@Test
public void testPartialReaderWithOffsetAndLimitedRecordCount() throws
InitializationException, IOException {
final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
@@ -256,30 +190,6 @@ public class TestParquetReader {
"MapRecord[{name=Bob7, favorite_number=7,
favorite_color=blue7}]");
}
- @Test
- public void testPartialReaderWithOffsetOnly() throws
InitializationException, IOException {
- final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
- final ParquetReader parquetReader = new ParquetReader();
-
- runner.addControllerService("reader", parquetReader);
- runner.enableControllerService(parquetReader);
-
- runner.enqueue(Paths.get(PARQUET_PATH),
singletonMap(ParquetAttribute.RECORD_OFFSET, "3"));
-
- runner.setProperty(TestParquetProcessor.READER, "reader");
-
- runner.run();
- runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
-
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
- "MapRecord[{name=Bob3, favorite_number=3,
favorite_color=blue3}]\n" +
- "MapRecord[{name=Bob4, favorite_number=4,
favorite_color=blue4}]\n" +
- "MapRecord[{name=Bob5, favorite_number=5,
favorite_color=blue5}]\n" +
- "MapRecord[{name=Bob6, favorite_number=6,
favorite_color=blue6}]\n" +
- "MapRecord[{name=Bob7, favorite_number=7,
favorite_color=blue7}]\n" +
- "MapRecord[{name=Bob8, favorite_number=8,
favorite_color=blue8}]\n" +
- "MapRecord[{name=Bob9, favorite_number=9,
favorite_color=blue9}]");
- }
-
private List<Record> getRecords(File parquetFile, Map<String, String>
variables)
throws IOException, MalformedRecordException {
final List<Record> results = new ArrayList<>();