This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a5ec83baa NIFI-12241 Efficient Parquet Splitting
9a5ec83baa is described below

commit 9a5ec83baa1b3593031f0917659a69e7a36bb0be
Author: Rajmund Takacs <[email protected]>
AuthorDate: Wed Oct 18 10:24:28 2023 +0200

    NIFI-12241 Efficient Parquet Splitting
    
    This closes #7893.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../org/apache/nifi/util/MockProcessSession.java   |   5 +
 .../org/apache/nifi/parquet/ParquetReader.java     |  19 +-
 .../nifi/parquet/ParquetRecordSetWriter.java       |  37 ++-
 .../nifi/parquet/filter/OffsetRecordFilter.java    |  45 +++
 .../hadoop/AvroParquetHDFSRecordReader.java        |  17 +-
 .../nifi/parquet/record/ParquetRecordReader.java   |  70 +++-
 .../nifi/parquet/utils/ParquetAttribute.java       |  28 ++
 .../parquet/CalculateParquetOffsets.java           | 218 +++++++++++++
 .../parquet/CalculateParquetRowGroupOffsets.java   | 170 ++++++++++
 .../nifi/processors/parquet/FetchParquet.java      |  42 ++-
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../org/apache/nifi/parquet/ParquetTestUtils.java  |  98 ++++++
 .../apache/nifi/parquet/TestParquetProcessor.java  |  39 +--
 .../org/apache/nifi/parquet/TestParquetReader.java | 284 +++++++++++++----
 .../parquet/CalculateParquetOffsetsTest.java       | 354 +++++++++++++++++++++
 .../CalculateParquetRowGroupOffsetsTest.java       | 148 +++++++++
 .../nifi/processors/parquet/FetchParquetTest.java  | 216 +++++++++++--
 17 files changed, 1648 insertions(+), 144 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index c04bab70fd..14df9934fc 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -657,6 +657,11 @@ public class MockProcessSession implements ProcessSession {
                 bais.mark(readlimit);
             }
 
+            @Override
+            public boolean markSupported() {
+                return true;
+            }
+
             @Override
             public void reset() {
                 bais.reset();
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
index 94d2101e67..e84f17ceea 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.parquet;
 
+import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
+import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -28,15 +36,6 @@ import org.apache.nifi.parquet.utils.ParquetUtils;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
-import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
-
 @Tags({"parquet", "parse", "record", "row", "reader"})
 @CapabilityDescription("Parses Parquet data and returns each Parquet record as 
a separate Record object. " +
         "The schema will come from the Parquet data itself.")
@@ -47,7 +46,7 @@ public class ParquetReader extends AbstractControllerService 
implements RecordRe
         final Configuration conf = new Configuration();
         final ParquetConfig parquetConfig = 
createParquetConfig(getConfigurationContext(), variables);
         applyCommonConfig(conf, parquetConfig);
-        return new ParquetRecordReader(in, inputLength, conf);
+        return new ParquetRecordReader(in, inputLength, conf, variables);
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
index 08708fb319..25b3813b19 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
@@ -16,8 +16,21 @@
  */
 package org.apache.nifi.parquet;
 
+import static java.util.stream.Collectors.toMap;
+import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
+
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -27,6 +40,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.parquet.record.WriteParquetResult;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
 import org.apache.nifi.parquet.utils.ParquetConfig;
 import org.apache.nifi.parquet.utils.ParquetUtils;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -37,15 +51,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
-
 @Tags({"parquet", "result", "set", "writer", "serializer", "record", 
"recordset", "row"})
 @CapabilityDescription("Writes the contents of a RecordSet in Parquet format.")
 public class ParquetRecordSetWriter extends SchemaRegistryRecordSetWriter 
implements RecordSetWriterFactory {
@@ -107,7 +112,19 @@ public class ParquetRecordSetWriter extends 
SchemaRegistryRecordSetWriter implem
                 throw new SchemaNotFoundException("Failed to compile Avro 
Schema", e);
             }
 
-            return new WriteParquetResult(avroSchema, recordSchema, 
getSchemaAccessWriter(recordSchema, variables), out, parquetConfig, logger);
+            // These attributes should not be carried over to a newly written 
Parquet RecordSet, because then
+            // processors reading FlowFiles with any of these, might try to 
jump to non-existing locations.
+            final Set<String> propertiesToBeCleared = new 
HashSet<>(Arrays.asList(
+                    ParquetAttribute.RECORD_OFFSET,
+                    ParquetAttribute.FILE_RANGE_START_OFFSET,
+                    ParquetAttribute.FILE_RANGE_END_OFFSET
+            ));
+            final Map<String, String> filteredVariables = variables.entrySet()
+                    .stream()
+                    .filter(entry -> 
!propertiesToBeCleared.contains(entry.getKey()))
+                    .collect(toMap(Entry::getKey, Entry::getValue));
+
+            return new WriteParquetResult(avroSchema, recordSchema, 
getSchemaAccessWriter(recordSchema, filteredVariables), out, parquetConfig, 
logger);
 
         } catch (final SchemaNotFoundException e) {
             throw new ProcessException("Could not determine the Avro Schema to 
use for writing the content", e);
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/filter/OffsetRecordFilter.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/filter/OffsetRecordFilter.java
new file mode 100644
index 0000000000..999378a707
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/filter/OffsetRecordFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.parquet.filter;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+
+/**
+ * Filter to be used for 'jumping' to a specific record index.
+ */
+public class OffsetRecordFilter implements RecordFilter {
+
+    private final AtomicLong skipsRemaining;
+
+    public static UnboundRecordFilter offset(long startIndex) {
+        final AtomicLong skipsRemaining = new AtomicLong(startIndex);
+        return readers -> new OffsetRecordFilter(skipsRemaining);
+    }
+
+    private OffsetRecordFilter(AtomicLong skipsRemaining) {
+        this.skipsRemaining = skipsRemaining;
+    }
+
+    @Override
+    public boolean isMatch() {
+        // Get current value, and decrement it until zero, in a single atomic 
operation.
+        return skipsRemaining.getAndUpdate(l -> l > 0 ? l - 1 : l) == 0;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
index 15c0cd554d..c01bf04eae 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
@@ -35,11 +35,14 @@ public class AvroParquetHDFSRecordReader implements 
HDFSRecordReader {
     private GenericRecord lastRecord;
     private RecordSchema recordSchema;
     private boolean initialized = false;
+    private final Long recordsToRead;
+    private long recordsRead = 0;
 
     private final ParquetReader<GenericRecord> parquetReader;
 
-    public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> 
parquetReader) {
+    public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> 
parquetReader, final Long recordsToRead) {
         this.parquetReader = parquetReader;
+        this.recordsToRead = recordsToRead;
     }
 
     @Override
@@ -48,7 +51,7 @@ public class AvroParquetHDFSRecordReader implements 
HDFSRecordReader {
             return null;
         }
 
-        lastRecord = parquetReader.read();
+        lastRecord = readNextRecord();
         initialized = true;
 
         if (lastRecord == null) {
@@ -63,10 +66,18 @@ public class AvroParquetHDFSRecordReader implements 
HDFSRecordReader {
         return new MapRecord(recordSchema, values);
     }
 
-
     @Override
     public void close() throws IOException {
         parquetReader.close();
     }
 
+    private GenericRecord readNextRecord() throws IOException {
+        // No more records are available
+        if ((recordsToRead != null) && (recordsRead == recordsToRead)) {
+            return null;
+        }
+        GenericRecord result = parquetReader.read();
+        recordsRead++;
+        return result;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
index 6ff319d6b5..380081a3b0 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
@@ -16,45 +16,79 @@
  */
 package org.apache.nifi.parquet.record;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.parquet.filter.OffsetRecordFilter;
 import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetReader.Builder;
 import org.apache.parquet.io.InputFile;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-
 public class ParquetRecordReader implements RecordReader {
 
     private GenericRecord lastParquetRecord;
-    private RecordSchema recordSchema;
+    private final RecordSchema recordSchema;
 
     private final InputStream inputStream;
-    private final InputFile inputFile;
     private final ParquetReader<GenericRecord> parquetReader;
-
-    public ParquetRecordReader(final InputStream inputStream, final long 
inputLength, final Configuration configuration) throws IOException {
+    private final Long recordsToRead;
+    private long recordsRead = 0;
+
+    public ParquetRecordReader(
+            final InputStream inputStream,
+            final long inputLength,
+            final Configuration configuration,
+            final Map<String, String> variables
+    ) throws IOException {
         if (inputLength < 0) {
             throw new IllegalArgumentException("Invalid input length of '" + 
inputLength + "'. This record reader requires knowing " +
                     "the length of the InputStream and cannot be used in some 
cases where the length may not be known.");
         }
 
+        final Long offset = 
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_OFFSET))
+                .map(Long::parseLong)
+                .orElse(null);
+
+        recordsToRead = 
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_COUNT))
+                .map(Long::parseLong)
+                .orElse(null);
+
+        final long fileStartOffset = 
Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_START_OFFSET))
+                .map(Long::parseLong)
+                .orElse(0L);
+        final long fileEndOffset = 
Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_END_OFFSET))
+                .map(Long::parseLong)
+                .orElse(Long.MAX_VALUE);
+
         this.inputStream = inputStream;
 
-        inputFile = new NifiParquetInputFile(inputStream, inputLength);
-        parquetReader = 
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(configuration).build();
+        final InputFile inputFile = new NifiParquetInputFile(inputStream, 
inputLength);
+
+        final Builder<GenericRecord> builder = 
AvroParquetReader.<GenericRecord>builder(inputFile)
+                .withConf(configuration)
+                .withFileRange(fileStartOffset, fileEndOffset);
+
+        if (offset != null) {
+            
builder.withFilter(FilterCompat.get(OffsetRecordFilter.offset(offset)));
+        }
+
+        parquetReader = builder.build();
 
         // Read the first record so that we can extract the schema
-        lastParquetRecord = parquetReader.read();
+        lastParquetRecord = readNextRecord();
         if (lastParquetRecord == null) {
             throw new EOFException("Unable to obtain schema because no records 
were available");
         }
@@ -75,7 +109,7 @@ public class ParquetRecordReader implements RecordReader {
         final Record record = new MapRecord(recordSchema, values);
 
         // Read the next record and store for next time
-        lastParquetRecord = parquetReader.read();
+        lastParquetRecord = readNextRecord();
 
         // Return the converted record
         return record;
@@ -95,4 +129,14 @@ public class ParquetRecordReader implements RecordReader {
             inputStream.close();
         }
     }
+
+    private GenericRecord readNextRecord() throws IOException {
+        // No more records are available
+        if ((recordsToRead != null) && (recordsRead == recordsToRead)) {
+            return null;
+        }
+        GenericRecord result = parquetReader.read();
+        recordsRead++;
+        return result;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetAttribute.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetAttribute.java
new file mode 100644
index 0000000000..277535cebe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetAttribute.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.parquet.utils;
+
+public final class ParquetAttribute {
+    public static final String RECORD_OFFSET = "record.offset";
+    public static final String RECORD_COUNT = "record.count";
+    public static final String FILE_RANGE_START_OFFSET = 
"parquet.file.range.startOffset";
+    public static final String FILE_RANGE_END_OFFSET = 
"parquet.file.range.endOffset";
+
+    private ParquetAttribute() {
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java
new file mode 100644
index 0000000000..5c33c833b8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static java.util.Collections.singletonList;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing", 
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "The processor generates N flow files from the input, and adds 
attributes with the offsets required to read "
+                + "the group of rows in the FlowFile's content. Can be used to 
increase the overall efficiency of "
+                + "processing extremely large Parquet files."
+)
+@WritesAttributes({
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Sets the index of first record of the parquet 
file."
+        ),
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Sets the number of records in the parquet file."
+        )
+})
+@ReadsAttributes({
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Gets the index of first record in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Gets the number of records in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+                description = "Gets the start offset of the selected row group 
in the parquet file."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+                description = "Gets the end offset of the selected row group 
in the parquet file."
+        )
+})
+@SideEffectFree
+public class CalculateParquetOffsets extends AbstractProcessor {
+
+    static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new 
PropertyDescriptor.Builder()
+            .name("Records Per Split")
+            .description("Specifies how many records should be covered in each 
FlowFile")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Zero Content Output")
+            .description("Whether to do, or do not copy the content of input 
FlowFile.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles, with special attributes that represent a 
chunk of the input file.")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(
+            PROP_RECORDS_PER_SPLIT,
+            PROP_ZERO_CONTENT_OUTPUT
+    );
+
+    static final Set<Relationship> RELATIONSHIPS = new 
HashSet<>(singletonList(REL_SUCCESS));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile inputFlowFile = session.get();
+        if (inputFlowFile == null) {
+            return;
+        }
+
+        final long partitionSize = 
context.getProperty(PROP_RECORDS_PER_SPLIT).asLong();
+        final boolean zeroContentOutput = 
context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
+
+        final long recordOffset = 
Optional.ofNullable(inputFlowFile.getAttribute(ParquetAttribute.RECORD_OFFSET))
+                .map(Long::valueOf)
+                .orElse(0L);
+
+        final long recordCount = 
Optional.ofNullable(inputFlowFile.getAttribute(ParquetAttribute.RECORD_COUNT))
+                .map(Long::valueOf)
+                .orElseGet(() -> getRecordCount(session, inputFlowFile) - 
recordOffset);
+
+        List<FlowFile> partitions = getPartitions(
+                session, inputFlowFile, partitionSize, recordCount, 
recordOffset, zeroContentOutput);
+        session.transfer(partitions, REL_SUCCESS);
+        session.adjustCounter("Records Split", recordCount, false);
+        session.adjustCounter("Partitions Created", partitions.size(), false);
+
+        if (zeroContentOutput) {
+            session.remove(inputFlowFile);
+        }
+    }
+
+    private long getRecordCount(ProcessSession session, FlowFile flowFile) {
+        final long fileStartOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+                .map(Long::parseLong)
+                .orElse(0L);
+        final long fileEndOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+                .map(Long::parseLong)
+                .orElse(Long.MAX_VALUE);
+
+        final ParquetReadOptions readOptions = ParquetReadOptions.builder()
+                .withRange(fileStartOffset, fileEndOffset)
+                .build();
+        try (
+                InputStream in = session.read(flowFile);
+                ParquetFileReader reader = new ParquetFileReader(
+                        new NifiParquetInputFile(in, flowFile.getSize()),
+                        readOptions
+                )
+        ) {
+            return reader.getRecordCount();
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    private List<FlowFile> getPartitions(
+            ProcessSession session,
+            FlowFile inputFlowFile,
+            long partitionSize,
+            long recordCount,
+            long recordOffset,
+            boolean zeroContentOutput
+    ) {
+        final long numberOfPartitions = (recordCount / partitionSize) + 
((recordCount % partitionSize) > 0 ? 1 : 0);
+        final List<FlowFile> results = new 
ArrayList<>((int)Math.min(Integer.MAX_VALUE, numberOfPartitions));
+
+        for (long currentPartition = 0; currentPartition < numberOfPartitions; 
currentPartition++) {
+            long addedOffset = currentPartition * partitionSize;
+            final FlowFile outputFlowFile;
+            if (zeroContentOutput) {
+                outputFlowFile = session.create(inputFlowFile);
+            } else if (currentPartition == 0) {
+                outputFlowFile = inputFlowFile;
+            } else {
+                outputFlowFile = session.clone(inputFlowFile);
+            }
+            results.add(
+                    session.putAllAttributes(
+                            outputFlowFile,
+                            new HashMap<String, String>() {
+                                {
+                                    put(ParquetAttribute.RECORD_OFFSET, 
Long.toString(recordOffset + addedOffset));
+                                    put(ParquetAttribute.RECORD_COUNT, 
Long.toString(Math.min(partitionSize, recordCount - addedOffset)));
+                                }
+                            }
+                    )
+            );
+        }
+
+        return results;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsets.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsets.java
new file mode 100644
index 0000000000..39c24eb0be
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsets.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static java.util.Collections.singletonList;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing", 
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "The processor generates one FlowFile from each Row Group of the 
input, and adds attributes with the offsets "
+                + "required to read the group of rows in the FlowFile's 
content. Can be used to increase the overall "
+                + "efficiency of processing extremely large Parquet files."
+)
+@WritesAttributes({
+        @WritesAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+                description = "Sets the start offset of the selected row group 
in the parquet file."
+        ),
+        @WritesAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+                description = "Sets the end offset of the selected row group 
in the parquet file."
+        ),
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Sets the count of records in the selected row 
group."
+        )
+})
+@SideEffectFree
+public class CalculateParquetRowGroupOffsets extends AbstractProcessor {
+
+    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Zero Content Output")
+            .description("Whether to do, or do not copy the content of input 
FlowFile.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles, with special attributes that represent a 
chunk of the input file.")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
singletonList(PROP_ZERO_CONTENT_OUTPUT);
+
+    static final Set<Relationship> RELATIONSHIPS = new 
HashSet<>(singletonList(REL_SUCCESS));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final boolean zeroContentOutput = 
context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
+
+        final ParquetMetadata parquetMetadata = getParquetMetadata(session, 
original);
+        final List<FlowFile> partitions = getPartitions(session, original, 
parquetMetadata.getBlocks(), zeroContentOutput);
+        session.transfer(partitions, REL_SUCCESS);
+        session.adjustCounter("Partitions Created", partitions.size(), false);
+
+        if (zeroContentOutput) {
+            session.remove(original);
+        }
+    }
+
+    private ParquetMetadata getParquetMetadata(ProcessSession session, 
FlowFile flowFile) {
+        final ParquetReadOptions readOptions = 
ParquetReadOptions.builder().build();
+        try (
+                final InputStream in = session.read(flowFile);
+                ParquetFileReader reader = new ParquetFileReader(
+                        new NifiParquetInputFile(in, flowFile.getSize()),
+                        readOptions
+                )
+        ) {
+            return reader.getFooter();
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    private List<FlowFile> getPartitions(
+            ProcessSession session,
+            FlowFile flowFile,
+            List<BlockMetaData> blocks,
+            boolean zeroContentOutput
+    ) {
+        final List<FlowFile> results = new ArrayList<>(blocks.size());
+
+        for (int currentPartition = 0; currentPartition < blocks.size(); 
currentPartition++) {
+            final BlockMetaData currentBlock = blocks.get(currentPartition);
+            final long currentBlockStartOffset = currentBlock.getStartingPos();
+            final long currentBlockEndOffset = currentBlockStartOffset + 
currentBlock.getCompressedSize();
+            final FlowFile outputFlowFile;
+            if (zeroContentOutput) {
+                outputFlowFile = session.create(flowFile);
+            } else if (currentPartition == 0) {
+                outputFlowFile = flowFile;
+            } else {
+                outputFlowFile = session.clone(flowFile);
+            }
+            results.add(
+                    session.putAllAttributes(
+                            outputFlowFile,
+                            new HashMap<String, String>() {
+                                {
+                                    
put(ParquetAttribute.FILE_RANGE_START_OFFSET, 
String.valueOf(currentBlockStartOffset));
+                                    
put(ParquetAttribute.FILE_RANGE_END_OFFSET, 
String.valueOf(currentBlockEndOffset));
+                                    put(ParquetAttribute.RECORD_COUNT, 
String.valueOf(currentBlock.getRowCount()));
+                                }
+                            }
+                    )
+            );
+        }
+
+        return results;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index d971fee95d..8820a4cbd3 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -17,10 +17,13 @@
 package org.apache.nifi.processors.parquet;
 
 import java.io.IOException;
+import java.util.Optional;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -31,11 +34,14 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.filter.OffsetRecordFilter;
 import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordReader;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
 import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.InputFile;
@@ -53,6 +59,16 @@ import org.apache.parquet.io.InputFile;
         @WritesAttribute(attribute = "record.count", description = "The number 
of records in the resulting flow file"),
         @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file is stored in this attribute.")
 })
+@ReadsAttributes({
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Gets the index of first record in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Gets the number of records in the input."
+        )
+})
 @SeeAlso({PutParquet.class})
 @Restricted(restrictions = {
     @Restriction(
@@ -63,9 +79,31 @@ public class FetchParquet extends AbstractFetchHDFSRecord {
 
     @Override
     public HDFSRecordReader createHDFSRecordReader(final ProcessContext 
context, final FlowFile flowFile, final Configuration conf, final Path path) 
throws IOException {
+        final Long offset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_OFFSET))
+                .map(Long::parseLong)
+                .orElse(null);
+
+        final Long count = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_COUNT))
+                .map(Long::parseLong)
+                .orElse(null);
+
+        final long fileStartOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+                .map(Long::parseLong)
+                .orElse(0L);
+        final long fileEndOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+                .map(Long::parseLong)
+                .orElse(Long.MAX_VALUE);
+
         final InputFile inputFile = HadoopInputFile.fromPath(path, conf);
-        final ParquetReader.Builder<GenericRecord> readerBuilder = 
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(conf);
-        return new AvroParquetHDFSRecordReader(readerBuilder.build());
+        final ParquetReader.Builder<GenericRecord> readerBuilder = 
AvroParquetReader.<GenericRecord>builder(inputFile)
+                .withConf(conf)
+                .withFileRange(fileStartOffset, fileEndOffset);
+
+        if (offset != null) {
+            
readerBuilder.withFilter(FilterCompat.get(OffsetRecordFilter.offset(offset)));
+        }
+
+        return new AvroParquetHDFSRecordReader(readerBuilder.build(), count);
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6826d6e74b..0a8a9f3135 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,4 +14,6 @@
 # limitations under the License.
 org.apache.nifi.processors.parquet.PutParquet
 org.apache.nifi.processors.parquet.FetchParquet
+org.apache.nifi.processors.parquet.CalculateParquetOffsets
+org.apache.nifi.processors.parquet.CalculateParquetRowGroupOffsets
 org.apache.nifi.processors.parquet.ConvertAvroToParquet
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
new file mode 100644
index 0000000000..3e4c690fa0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.parquet;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+
+public class ParquetTestUtils {
+
+    public static File createUsersParquetFile(int numUsers) throws IOException 
{
+        return createUsersParquetFile(IntStream
+                .range(0, numUsers)
+                .mapToObj(ParquetTestUtils::createUser)
+                .collect(toList())
+        );
+    }
+
+    public static Map<String, Object> createUser(int i) {
+        return new HashMap<String, Object>() {
+            {
+                put("name", "Bob" + i);
+                put("favorite_number", i);
+                put("favorite_color", "blue" + i);
+            }
+        };
+    }
+
+    private static File createUsersParquetFile(Collection<Map<String, Object>> 
users) throws IOException {
+        final Schema schema = getSchema();
+        final File parquetFile = new 
File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
+
+        // write some users to the parquet file...
+        try (final ParquetWriter<GenericRecord> writer = 
createParquetWriter(schema, parquetFile)) {
+            users.forEach(user -> {
+                final GenericRecord record = new GenericData.Record(schema);
+                user.forEach(record::put);
+                try {
+                    writer.write(record);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        return parquetFile;
+    }
+
+    private static Schema getSchema() throws IOException {
+        try (InputStream schemaInputStream = 
ParquetTestUtils.class.getClassLoader().getResourceAsStream("avro/user.avsc")) {
+            assert schemaInputStream != null;
+            final String schemaString = IOUtils.toString(schemaInputStream, 
StandardCharsets.UTF_8);
+            return new Schema.Parser().parse(schemaString);
+        }
+    }
+
+    private static ParquetWriter<GenericRecord> createParquetWriter(final 
Schema schema, final File parquetFile) throws IOException {
+        final Configuration conf = new Configuration();
+        final Path parquetPath = new Path(parquetFile.getPath());
+
+        return 
AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath, 
conf))
+                .withSchema(schema)
+                .withConf(conf)
+                .withRowGroupSize(8192L)
+                .build();
+    }
+
+    private ParquetTestUtils() {}
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
index 21a81dcfbb..abb03876d7 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
@@ -16,7 +16,13 @@
  */
 package org.apache.nifi.parquet;
 
-import org.apache.commons.io.IOUtils;
+import static java.util.Collections.singletonList;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -24,20 +30,11 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StringUtils;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 public class TestParquetProcessor extends AbstractProcessor {
 
     public static final PropertyDescriptor READER = new 
PropertyDescriptor.Builder()
@@ -47,13 +44,6 @@ public class TestParquetProcessor extends AbstractProcessor {
             .required(true)
             .build();
 
-    public static final PropertyDescriptor PATH = new 
PropertyDescriptor.Builder()
-            .name("path")
-            .description("path")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
     public static final Relationship SUCCESS = new Relationship.Builder()
             .name("success")
             .description("success")
@@ -66,16 +56,7 @@ public class TestParquetProcessor extends AbstractProcessor {
 
         final List<String> records = new ArrayList<>();
 
-        byte[] parquetBytes;
-
-        // read the parquet file into bytes since we can't use a 
FileInputStream since it doesn't support mark/reset
-        try {
-            parquetBytes = IOUtils.toByteArray(new 
File(context.getProperty(PATH).getValue()).toURI());
-        } catch (Exception e) {
-            throw new ProcessException(e);
-        }
-
-        try (final InputStream in = new ByteArrayInputStream(parquetBytes);
+        try (final InputStream in = session.read(flowFile);
              final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
             Record record;
             while ((record = reader.nextRecord()) != null) {
@@ -92,12 +73,12 @@ public class TestParquetProcessor extends AbstractProcessor 
{
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return new ArrayList<PropertyDescriptor>() {{ add(READER); add(PATH); 
}};
+        return singletonList(READER);
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        return new HashSet<Relationship>() {{ add(SUCCESS); }};
+        return new HashSet<>(singletonList(SUCCESS));
     }
 
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index 5ec3f1a91e..1b50d131d0 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -16,48 +16,43 @@
  */
 package org.apache.nifi.parquet;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
 @DisabledOnOs({ OS.WINDOWS })
 public class TestParquetReader {
 
     private static final String PARQUET_PATH = 
"src/test/resources/TestParquetReader.parquet";
-    private static final String SCHEMA_PATH = 
"src/test/resources/avro/user.avsc";
 
     private ParquetReader parquetReaderFactory;
     private ComponentLog componentLog;
@@ -75,70 +70,241 @@ public class TestParquetReader {
 
     @Test
     public void testReadUsers() throws IOException, MalformedRecordException {
-        final Schema schema = getSchema();
-        final File parquetFile = new 
File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
-
-        // write some users to the parquet file...
         final int numUsers = 10;
-        try (final ParquetWriter<GenericRecord> writer = 
createParquetWriter(schema, parquetFile)) {
-            for (int i=0; i < numUsers; i++) {
-                final GenericRecord user = new GenericData.Record(schema);
-                user.put("name", "Bob" + i);
-                user.put("favorite_number", i);
-                user.put("favorite_color", "blue" + i);
-                writer.write(user);
-            }
-        }
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(parquetFile, emptyMap());
 
-        // read the parquet file into bytes since we can't use a 
FileInputStream since it doesn't support mark/reset
-        final byte[] parquetBytes = IOUtils.toByteArray(parquetFile.toURI());
+        assertEquals(numUsers, results.size());
+        IntStream.range(0, numUsers)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i), 
convertRecordToUser(results.get(i))));
+    }
 
-        // read the users in using the record reader...
-        try (final InputStream in = new ByteArrayInputStream(parquetBytes);
-             final RecordReader recordReader = 
parquetReaderFactory.createRecordReader(
-                     Collections.emptyMap(), in, parquetFile.length(), 
componentLog)) {
+    @Test
+    public void testReadUsersPartiallyWithLimitedRecordCount() throws 
IOException, MalformedRecordException {
+        final int numUsers = 25;
+        final int expectedRecords = 3;
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(parquetFile, 
singletonMap(ParquetAttribute.RECORD_COUNT, "3"));
+
+        assertEquals(expectedRecords, results.size());
+        IntStream.range(0, expectedRecords)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i), 
convertRecordToUser(results.get(i))));
+    }
+
+    @Test
+    public void testReadUsersPartiallyWithOffset() throws IOException, 
MalformedRecordException {
+        final int numUsers = 1000025; // intentionally so large, to test input 
with many record groups
+        final int expectedRecords = 5;
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(parquetFile, 
singletonMap(ParquetAttribute.RECORD_OFFSET, "1000020"));
+
+        assertEquals(expectedRecords, results.size());
+        IntStream.range(0, expectedRecords)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
1000020), convertRecordToUser(results.get(i))));
+    }
 
-            int recordCount = 0;
-            while (recordReader.nextRecord() != null) {
-                recordCount++;
+    @Test
+    public void testReadUsersPartiallyWithOffsetAndLimitedRecordCount() throws 
IOException, MalformedRecordException {
+        final int numUsers = 1000025; // intentionally so large, to test input 
with many record groups
+        final int expectedRecords = 2;
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(parquetFile, new 
HashMap<String, String>() {
+            {
+                put(ParquetAttribute.RECORD_OFFSET, "1000020");
+                put(ParquetAttribute.RECORD_COUNT, "2");
             }
-            assertEquals(numUsers, recordCount);
-        }
+        });
+
+        assertEquals(expectedRecords, results.size());
+        IntStream.range(0, expectedRecords)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
1000020), convertRecordToUser(results.get(i))));
+    }
+
+    @Test
+    public void testReadUsersPartiallyWithLimitedRecordCountWithinFileRange()
+            throws IOException, MalformedRecordException {
+        final int numUsers = 1000;
+        final int expectedRecords = 3;
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(
+                parquetFile,
+                new HashMap<String, String>() {
+                    {
+                        put(ParquetAttribute.RECORD_COUNT, "3");
+                        put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+                        put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+                    }
+                }
+        );
+
+        assertEquals(expectedRecords, results.size());
+        IntStream.range(0, expectedRecords)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
663), convertRecordToUser(results.get(i))));
+    }
+
+    @Test
+    public void testReadUsersPartiallyWithOffsetWithinFileRange() throws 
IOException, MalformedRecordException {
+        final int numUsers = 1000;
+        final int expectedRecords = 5;
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(
+                parquetFile,
+                new HashMap<String, String>() {
+                    {
+                        put(ParquetAttribute.RECORD_OFFSET, "321");
+                        put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+                        put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+                    }
+                }
+        );
+
+        assertEquals(expectedRecords, results.size());
+        IntStream.range(0, expectedRecords)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
984), convertRecordToUser(results.get(i))));
+    }
+
+    @Test
+    public void 
testReadUsersPartiallyWithOffsetAndLimitedRecordCountWithinFileRange()
+            throws IOException, MalformedRecordException {
+        final int numUsers = 1000;
+        final int expectedRecords = 2;
+        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
+        final List<Record> results = getRecords(
+                parquetFile,
+                new HashMap<String, String>() {
+                    {
+                        put(ParquetAttribute.RECORD_OFFSET, "321");
+                        put(ParquetAttribute.RECORD_COUNT, "2");
+                        put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+                        put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+                    }
+                }
+        );
+
+        assertEquals(expectedRecords, results.size());
+        IntStream.range(0, expectedRecords)
+                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
984), convertRecordToUser(results.get(i))));
     }
 
     @Test
     public void testReader() throws InitializationException, IOException  {
         final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
+        final ParquetReader parquetReader = new ParquetReader();
+
+        runner.addControllerService("reader", parquetReader);
+        runner.enableControllerService(parquetReader);
 
+        runner.enqueue(Paths.get(PARQUET_PATH));
+
+        runner.setProperty(TestParquetProcessor.READER, "reader");
 
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+                "MapRecord[{name=Bob0, favorite_number=0, 
favorite_color=blue0}]\n" +
+                "MapRecord[{name=Bob1, favorite_number=1, 
favorite_color=blue1}]\n" +
+                "MapRecord[{name=Bob2, favorite_number=2, 
favorite_color=blue2}]\n" +
+                "MapRecord[{name=Bob3, favorite_number=3, 
favorite_color=blue3}]\n" +
+                "MapRecord[{name=Bob4, favorite_number=4, 
favorite_color=blue4}]\n" +
+                "MapRecord[{name=Bob5, favorite_number=5, 
favorite_color=blue5}]\n" +
+                "MapRecord[{name=Bob6, favorite_number=6, 
favorite_color=blue6}]\n" +
+                "MapRecord[{name=Bob7, favorite_number=7, 
favorite_color=blue7}]\n" +
+                "MapRecord[{name=Bob8, favorite_number=8, 
favorite_color=blue8}]\n" +
+                "MapRecord[{name=Bob9, favorite_number=9, 
favorite_color=blue9}]");
+    }
+
+    @Test
+    public void testPartialReaderWithLimitedRecordCount() throws 
InitializationException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
         final ParquetReader parquetReader = new ParquetReader();
 
         runner.addControllerService("reader", parquetReader);
         runner.enableControllerService(parquetReader);
 
-        runner.enqueue(Paths.get(PARQUET_PATH));
+        runner.enqueue(Paths.get(PARQUET_PATH), 
singletonMap(ParquetAttribute.RECORD_COUNT, "2"));
 
         runner.setProperty(TestParquetProcessor.READER, "reader");
-        runner.setProperty(TestParquetProcessor.PATH, PARQUET_PATH);
 
         runner.run();
         runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+                "MapRecord[{name=Bob0, favorite_number=0, 
favorite_color=blue0}]\n" +
+                "MapRecord[{name=Bob1, favorite_number=1, 
favorite_color=blue1}]");
     }
 
+    @Test
+    public void testPartialReaderWithOffsetAndLimitedRecordCount() throws 
InitializationException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
+        final ParquetReader parquetReader = new ParquetReader();
+
+        runner.addControllerService("reader", parquetReader);
+        runner.enableControllerService(parquetReader);
+
+        runner.enqueue(Paths.get(PARQUET_PATH), new HashMap<String, String>() {
+            {
+                put(ParquetAttribute.RECORD_OFFSET, "6");
+                put(ParquetAttribute.RECORD_COUNT, "2");
+            }
+        });
+
+        runner.setProperty(TestParquetProcessor.READER, "reader");
 
-    private Schema getSchema() throws IOException {
-        final File schemaFile = new File(SCHEMA_PATH);
-        final String schemaString = IOUtils.toString(new 
FileInputStream(schemaFile), StandardCharsets.UTF_8);
-        return new Schema.Parser().parse(schemaString);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+                "MapRecord[{name=Bob6, favorite_number=6, 
favorite_color=blue6}]\n" +
+                "MapRecord[{name=Bob7, favorite_number=7, 
favorite_color=blue7}]");
+    }
+
+    @Test
+    public void testPartialReaderWithOffsetOnly() throws 
InitializationException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
+        final ParquetReader parquetReader = new ParquetReader();
+
+        runner.addControllerService("reader", parquetReader);
+        runner.enableControllerService(parquetReader);
+
+        runner.enqueue(Paths.get(PARQUET_PATH), 
singletonMap(ParquetAttribute.RECORD_OFFSET, "3"));
+
+        runner.setProperty(TestParquetProcessor.READER, "reader");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+                "MapRecord[{name=Bob3, favorite_number=3, 
favorite_color=blue3}]\n" +
+                "MapRecord[{name=Bob4, favorite_number=4, 
favorite_color=blue4}]\n" +
+                "MapRecord[{name=Bob5, favorite_number=5, 
favorite_color=blue5}]\n" +
+                "MapRecord[{name=Bob6, favorite_number=6, 
favorite_color=blue6}]\n" +
+                "MapRecord[{name=Bob7, favorite_number=7, 
favorite_color=blue7}]\n" +
+                "MapRecord[{name=Bob8, favorite_number=8, 
favorite_color=blue8}]\n" +
+                "MapRecord[{name=Bob9, favorite_number=9, 
favorite_color=blue9}]");
     }
 
-    private ParquetWriter<GenericRecord> createParquetWriter(final Schema 
schema, final File parquetFile) throws IOException {
-        final Configuration conf = new Configuration();
-        final Path parquetPath = new Path(parquetFile.getPath());
+    private List<Record> getRecords(File parquetFile, Map<String, String> 
variables)
+            throws IOException, MalformedRecordException {
+        final List<Record> results = new ArrayList<>();
+        // read the parquet file into bytes since we can't use a 
FileInputStream since it doesn't support mark/reset
+        final byte[] parquetBytes = IOUtils.toByteArray(parquetFile.toURI());
+
+        // read the users in using the record reader...
+        try (final InputStream in = new ByteArrayInputStream(parquetBytes);
+                final RecordReader recordReader = 
parquetReaderFactory.createRecordReader(
+                        variables, in, parquetFile.length(), componentLog)) {
+
+            Record record;
+            while ((record = recordReader.nextRecord()) != null) {
+                results.add(record);
+            }
+        }
+        return results;
+    }
 
-        return 
AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath, 
conf))
-                        .withSchema(schema)
-                        .withConf(conf)
-                        .build();
+    private Map<String, Object> convertRecordToUser(Record record) {
+        return record.getRawFieldNames()
+                .stream()
+                .collect(toMap(
+                        fieldName -> fieldName,
+                        record::getValue
+                ));
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetOffsetsTest.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetOffsetsTest.java
new file mode 100644
index 0000000000..9811d03762
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetOffsetsTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static java.util.Collections.singletonMap;
+import static 
org.apache.nifi.processors.parquet.CalculateParquetOffsets.PROP_RECORDS_PER_SPLIT;
+import static 
org.apache.nifi.processors.parquet.CalculateParquetOffsets.PROP_ZERO_CONTENT_OUTPUT;
+import static 
org.apache.nifi.processors.parquet.CalculateParquetOffsets.REL_SUCCESS;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class CalculateParquetOffsetsTest {
+
+    private static final Path PARQUET_PATH = 
Paths.get("src/test/resources/TestParquetReader.parquet");
+    private static final Path NOT_PARQUET_PATH = 
Paths.get("src/test/resources/core-site.xml");
+
+    private static final Map<String, String> PRESERVED_ATTRIBUTES = new 
HashMap<String, String>() {
+        {
+            put("foo", "bar");
+            put("example", "value");
+        }
+    };
+
+    private TestRunner runner;
+
+    @BeforeEach
+    public void setUp() {
+        runner = TestRunners.newTestRunner(new CalculateParquetOffsets());
+    }
+
+    @Test
+    public void testSinglePartition() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "10");
+        runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"10");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals(PARQUET_PATH);
+
+        PRESERVED_ATTRIBUTES.forEach(results.get(0)::assertAttributeEquals);
+    }
+
+    @Test
+    public void testEachGoesSeparatePartition() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "1");
+        runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 10);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        for (int i = 0; i < 10; i++) {
+            
results.get(i).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, "1");
+            
results.get(i).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
String.valueOf(i));
+            results.get(i).assertContentEquals(PARQUET_PATH);
+        }
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testHalfPartitions() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "5");
+        runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"5");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals(PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"5");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"5");
+        results.get(1).assertContentEquals(PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testAsymmetricPartitions() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+        runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"8");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals(PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"2");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"8");
+        results.get(1).assertContentEquals(PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testSubPartitioningWithCountAndOffset() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+        runner.enqueue(PARQUET_PATH, createAttributes(new HashMap<String, 
String>() {
+            {
+                put(ParquetAttribute.RECORD_COUNT, "7");
+                put(ParquetAttribute.RECORD_OFFSET, "2");
+            }
+        }));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 3);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"3");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"2");
+        results.get(0).assertContentEquals(PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"3");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"5");
+        results.get(1).assertContentEquals(PARQUET_PATH);
+
+        results.get(2).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"1");
+        results.get(2).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"8");
+        results.get(2).assertContentEquals(PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testSubPartitioningWithoutOffset() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "2");
+        runner.enqueue(PARQUET_PATH, 
createAttributes(singletonMap(ParquetAttribute.RECORD_COUNT, "3")));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"2");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals(PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"1");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"2");
+        results.get(1).assertContentEquals(PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testSubPartitioningWithoutCount() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "5");
+        runner.enqueue(PARQUET_PATH, 
createAttributes(singletonMap(ParquetAttribute.RECORD_OFFSET, "3")));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"5");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"3");
+        results.get(0).assertContentEquals(PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"2");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"8");
+        results.get(1).assertContentEquals(PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testZeroContentOutput() throws Exception {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+        runner.setProperty(PROP_ZERO_CONTENT_OUTPUT, "true");
+        runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"8");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals("");
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"2");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"8");
+        results.get(1).assertContentEquals("");
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testEmptyInput() {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+        runner.enqueue("", PRESERVED_ATTRIBUTES);
+
+        final Throwable thrownException = assertThrows(Throwable.class, () -> 
runner.run());
+        assertTrue(thrownException.getMessage().contains("is not a Parquet 
file"));
+    }
+
+    @Test
+    public void testEmptyInputWithOffsetAttribute() {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+        runner.enqueue("", 
createAttributes(singletonMap(ParquetAttribute.RECORD_OFFSET, "4")));
+
+        final Throwable thrownException = assertThrows(Throwable.class, () -> 
runner.run());
+        assertTrue(thrownException.getMessage().contains("is not a Parquet 
file"));
+    }
+
+    @Test
+    public void testEmptyInputWithCountAttribute() {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+        runner.enqueue("", 
createAttributes(singletonMap(ParquetAttribute.RECORD_COUNT, "4")));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"3");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals("");
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"1");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"3");
+        results.get(1).assertContentEquals("");
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testEmptyInputWithOffsetAndCountAttributes() {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+        runner.enqueue("", createAttributes(new HashMap<String, String>() {
+            {
+                put(ParquetAttribute.RECORD_OFFSET, "2");
+                put(ParquetAttribute.RECORD_COUNT, "4");
+            }
+        }));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"3");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"2");
+        results.get(0).assertContentEquals("");
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"1");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"5");
+        results.get(1).assertContentEquals("");
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testUnrecognizedInput() throws IOException {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+        runner.enqueue(NOT_PARQUET_PATH, PRESERVED_ATTRIBUTES);
+
+        final Throwable thrownException = assertThrows(Throwable.class, () -> 
runner.run());
+        assertTrue(thrownException.getMessage().contains("is not a Parquet 
file"));
+    }
+
+    @Test
+    public void testUnrecognizedInputWithOffsetAttribute() throws IOException {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+        runner.enqueue(NOT_PARQUET_PATH, 
createAttributes(singletonMap(ParquetAttribute.RECORD_OFFSET, "4")));
+
+        final Throwable thrownException = assertThrows(Throwable.class, () -> 
runner.run());
+        assertTrue(thrownException.getMessage().contains("is not a Parquet 
file"));
+    }
+
+    @Test
+    public void testUnrecognizedInputWithCountAttribute() throws IOException {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+        runner.enqueue(NOT_PARQUET_PATH, 
createAttributes(singletonMap(ParquetAttribute.RECORD_COUNT, "4")));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"3");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"0");
+        results.get(0).assertContentEquals(NOT_PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"1");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"3");
+        results.get(1).assertContentEquals(NOT_PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testUnrecognizedInputWithOffsetAndCountAttributes() throws 
IOException {
+        runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+        runner.enqueue(NOT_PARQUET_PATH, createAttributes(new HashMap<String, 
String>() {
+            {
+                put(ParquetAttribute.RECORD_OFFSET, "2");
+                put(ParquetAttribute.RECORD_COUNT, "4");
+            }
+        }));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"3");
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"2");
+        results.get(0).assertContentEquals(NOT_PARQUET_PATH);
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"1");
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET, 
"5");
+        results.get(1).assertContentEquals(NOT_PARQUET_PATH);
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    private HashMap<String, String> createAttributes(Map<String, String> 
additionalAttributes) {
+        return new HashMap<String, String>(PRESERVED_ATTRIBUTES) {{
+            putAll(additionalAttributes);
+        }};
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsetsTest.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsetsTest.java
new file mode 100644
index 0000000000..dfd4dbacf4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsetsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static 
org.apache.nifi.processors.parquet.CalculateParquetOffsets.PROP_ZERO_CONTENT_OUTPUT;
+import static 
org.apache.nifi.processors.parquet.CalculateParquetOffsets.REL_SUCCESS;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.parquet.ParquetTestUtils;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+@DisabledOnOs({ OS.WINDOWS })
+public class CalculateParquetRowGroupOffsetsTest {
+
+    private static final Path NOT_PARQUET_PATH = 
Paths.get("src/test/resources/core-site.xml");
+
+    private static final Map<String, String> PRESERVED_ATTRIBUTES = new 
HashMap<String, String>() {
+        {
+            put("foo", "bar");
+            put("example", "value");
+        }
+    };
+
+    private TestRunner runner;
+
+    @BeforeEach
+    public void setUp() {
+        runner = TestRunners.newTestRunner(new 
CalculateParquetRowGroupOffsets());
+    }
+
+    @Test
+    public void testSinglePartition() throws Exception {
+        File parquetFile = ParquetTestUtils.createUsersParquetFile(10);
+        runner.enqueue(parquetFile.toPath(), PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"10");
+        
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"4");
+        
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"298");
+        results.get(0).assertContentEquals(parquetFile.toPath());
+
+        PRESERVED_ATTRIBUTES.forEach(results.get(0)::assertAttributeEquals);
+    }
+
+    @Test
+    public void testEachRowGroupGoesToSeparatePartition() throws Exception {
+        File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+        runner.enqueue(parquetFile.toPath(), PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"337");
+        
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"4");
+        
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"8301");
+        results.get(0).assertContentEquals(parquetFile.toPath());
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"326");
+        
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"8301");
+        
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"16543");
+        results.get(1).assertContentEquals(parquetFile.toPath());
+
+        results.get(2).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"326");
+        
results.get(2).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"16543");
+        
results.get(2).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"24784");
+        results.get(2).assertContentEquals(parquetFile.toPath());
+
+        results.get(3).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"11");
+        
results.get(3).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"24784");
+        
results.get(3).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"25143");
+        results.get(3).assertContentEquals(parquetFile.toPath());
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testZeroContentOutput() throws Exception {
+        File parquetFile = ParquetTestUtils.createUsersParquetFile(500);
+        runner.setProperty(PROP_ZERO_CONTENT_OUTPUT, "true");
+        runner.enqueue(parquetFile.toPath(), PRESERVED_ATTRIBUTES);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        final List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"337");
+        
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"4");
+        
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"8301");
+        results.get(0).assertContentEquals("");
+
+        results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, 
"163");
+        
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET, 
"8301");
+        
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET, 
"12468");
+        results.get(1).assertContentEquals("");
+
+        results.forEach(flowFile -> 
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+    }
+
+    @Test
+    public void testEmptyInput() {
+        runner.enqueue("", PRESERVED_ATTRIBUTES);
+
+        final Throwable thrownException = assertThrows(Throwable.class, () -> 
runner.run());
+        assertTrue(thrownException.getMessage().contains("is not a Parquet 
file"));
+    }
+
+    @Test
+    public void testUnrecognizedInput() throws IOException {
+        runner.enqueue(NOT_PARQUET_PATH, PRESERVED_ATTRIBUTES);
+
+        final Throwable thrownException = assertThrows(Throwable.class, () -> 
runner.run());
+        assertTrue(thrownException.getMessage().contains("is not a Parquet 
file"));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index df412c8fe2..5ee5f4e37f 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -16,6 +16,23 @@
  */
 package org.apache.nifi.processors.parquet;
 
+import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -26,6 +43,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parquet.ParquetTestUtils;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
 import org.apache.nifi.reporting.InitializationException;
@@ -48,24 +67,6 @@ import org.junit.jupiter.api.condition.OS;
 import org.mockito.AdditionalMatchers;
 import org.mockito.Mockito;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.when;
-
 @DisabledOnOs({ OS.WINDOWS })
 public class FetchParquetTest {
 
@@ -141,6 +142,185 @@ public class FetchParquetTest {
         verifyCSVRecords(flowFileContent);
     }
 
+    @Test
+    public void testFetchParquetToCSVWithOffsetAndCount() throws IOException, 
InitializationException {
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new 
File(parquetDir,"testFetchParquetToCSV.parquet");
+        writeParquetUsers(parquetFile);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), 
parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+        attributes.put(ParquetAttribute.RECORD_OFFSET, "3");
+        attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
+
+        // the mock record writer will write the header for each record so 
replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        assertEquals("Bob3,3,blue3\n", flowFileContent);
+    }
+
+    @Test
+    public void testFetchParquetToCSVWithOffset() throws IOException, 
InitializationException {
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new 
File(parquetDir,"testFetchParquetToCSV.parquet");
+        writeParquetUsers(parquetFile);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), 
parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+        attributes.put(ParquetAttribute.RECORD_OFFSET, "9");
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
+
+        // the mock record writer will write the header for each record so 
replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        assertEquals("Bob9,9,blue9\n", flowFileContent);
+    }
+
+    @Test
+    public void testFetchParquetToCSVWithCount() throws IOException, 
InitializationException {
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new 
File(parquetDir,"testFetchParquetToCSV.parquet");
+        writeParquetUsers(parquetFile);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), 
parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+        attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
+
+        // the mock record writer will write the header for each record so 
replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        assertEquals("Bob0,0,blue0\n", flowFileContent);
+    }
+
+    @Test
+    public void testFetchParquetToCSVWithOffsetWithinFileRange() throws 
IOException, InitializationException {
+        configure(proc);
+
+        final File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+        final File parquetDir = parquetFile.getParentFile();
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), 
parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+        attributes.put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+        attributes.put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+        attributes.put(ParquetAttribute.RECORD_OFFSET, "325");
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
+
+        // the mock record writer will write the header for each record so 
replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        assertEquals("Bob988,988,blue988\n", flowFileContent);
+    }
+
+    @Test
+    public void testFetchParquetToCSVWithCountWithinFileRange() throws 
IOException, InitializationException {
+        configure(proc);
+
+        final File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+        final File parquetDir = parquetFile.getParentFile();
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), 
parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+        attributes.put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+        attributes.put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+        attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
+
+        // the mock record writer will write the header for each record so 
replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        assertEquals("Bob663,663,blue663\n", flowFileContent);
+    }
+
+    @Test
+    public void testFetchParquetToCSVWithOffsetAndCountWithinFileRange() 
throws IOException, InitializationException {
+        configure(proc);
+
+        final File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+        final File parquetDir = parquetFile.getParentFile();
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), 
parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+        attributes.put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+        attributes.put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+        attributes.put(ParquetAttribute.RECORD_OFFSET, "3");
+        attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
+
+        // the mock record writer will write the header for each record so 
replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        assertEquals("Bob666,666,blue666\n", flowFileContent);
+    }
+
     @Test
     public void testFetchWhenELEvaluatesToEmptyShouldRouteFailure() throws 
InitializationException {
         configure(proc);

Reply via email to