This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 387d263b3b NIFI-12241 Efficient Parquet Splitting
387d263b3b is described below
commit 387d263b3b15fa187bcac5bc4e0af00034f5f7ac
Author: Rajmund Takacs <[email protected]>
AuthorDate: Wed Oct 18 10:24:28 2023 +0200
NIFI-12241 Efficient Parquet Splitting
(cherry picked from commit 9a5ec83baa1b3593031f0917659a69e7a36bb0be)
---
.../org/apache/nifi/util/MockProcessSession.java | 5 +
.../org/apache/nifi/parquet/ParquetReader.java | 19 +-
.../nifi/parquet/ParquetRecordSetWriter.java | 37 ++-
.../nifi/parquet/filter/OffsetRecordFilter.java | 45 +++
.../hadoop/AvroParquetHDFSRecordReader.java | 17 +-
.../nifi/parquet/record/ParquetRecordReader.java | 70 +++-
.../nifi/parquet/utils/ParquetAttribute.java | 28 ++
.../parquet/CalculateParquetOffsets.java | 218 +++++++++++++
.../parquet/CalculateParquetRowGroupOffsets.java | 170 ++++++++++
.../nifi/processors/parquet/FetchParquet.java | 53 ++-
.../services/org.apache.nifi.processor.Processor | 2 +
.../org/apache/nifi/parquet/ParquetTestUtils.java | 98 ++++++
.../apache/nifi/parquet/TestParquetProcessor.java | 39 +--
.../org/apache/nifi/parquet/TestParquetReader.java | 284 +++++++++++++----
.../parquet/CalculateParquetOffsetsTest.java | 354 +++++++++++++++++++++
.../CalculateParquetRowGroupOffsetsTest.java | 148 +++++++++
.../nifi/processors/parquet/FetchParquetTest.java | 216 +++++++++++--
17 files changed, 1655 insertions(+), 148 deletions(-)
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index c408c44e5b..963ce6f202 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -665,6 +665,11 @@ public class MockProcessSession implements ProcessSession {
bais.mark(readlimit);
}
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
@Override
public void reset() {
bais.reset();
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
index 94d2101e67..e84f17ceea 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.parquet;
+import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
+import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -28,15 +36,6 @@ import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
-import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
-
@Tags({"parquet", "parse", "record", "row", "reader"})
@CapabilityDescription("Parses Parquet data and returns each Parquet record as
a separate Record object. " +
"The schema will come from the Parquet data itself.")
@@ -47,7 +46,7 @@ public class ParquetReader extends AbstractControllerService
implements RecordRe
final Configuration conf = new Configuration();
final ParquetConfig parquetConfig =
createParquetConfig(getConfigurationContext(), variables);
applyCommonConfig(conf, parquetConfig);
- return new ParquetRecordReader(in, inputLength, conf);
+ return new ParquetRecordReader(in, inputLength, conf, variables);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
index 08708fb319..25b3813b19 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
@@ -16,8 +16,21 @@
*/
package org.apache.nifi.parquet;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
+
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -27,6 +40,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.record.WriteParquetResult;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.processor.exception.ProcessException;
@@ -37,15 +51,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
import org.apache.nifi.serialization.record.RecordSchema;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
-
@Tags({"parquet", "result", "set", "writer", "serializer", "record",
"recordset", "row"})
@CapabilityDescription("Writes the contents of a RecordSet in Parquet format.")
public class ParquetRecordSetWriter extends SchemaRegistryRecordSetWriter
implements RecordSetWriterFactory {
@@ -107,7 +112,19 @@ public class ParquetRecordSetWriter extends
SchemaRegistryRecordSetWriter implem
throw new SchemaNotFoundException("Failed to compile Avro
Schema", e);
}
- return new WriteParquetResult(avroSchema, recordSchema,
getSchemaAccessWriter(recordSchema, variables), out, parquetConfig, logger);
+ // These attributes should not be carried over to a newly written
Parquet RecordSet, because then
+ // processors reading FlowFiles with any of these, might try to
jump to non-existing locations.
+ final Set<String> propertiesToBeCleared = new
HashSet<>(Arrays.asList(
+ ParquetAttribute.RECORD_OFFSET,
+ ParquetAttribute.FILE_RANGE_START_OFFSET,
+ ParquetAttribute.FILE_RANGE_END_OFFSET
+ ));
+ final Map<String, String> filteredVariables = variables.entrySet()
+ .stream()
+ .filter(entry ->
!propertiesToBeCleared.contains(entry.getKey()))
+ .collect(toMap(Entry::getKey, Entry::getValue));
+
+ return new WriteParquetResult(avroSchema, recordSchema,
getSchemaAccessWriter(recordSchema, filteredVariables), out, parquetConfig,
logger);
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to
use for writing the content", e);
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/filter/OffsetRecordFilter.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/filter/OffsetRecordFilter.java
new file mode 100644
index 0000000000..999378a707
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/filter/OffsetRecordFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.parquet.filter;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+
+/**
+ * Filter to be used for 'jumping' to a specific record index.
+ */
+public class OffsetRecordFilter implements RecordFilter {
+
+ private final AtomicLong skipsRemaining;
+
+ public static UnboundRecordFilter offset(long startIndex) {
+ final AtomicLong skipsRemaining = new AtomicLong(startIndex);
+ return readers -> new OffsetRecordFilter(skipsRemaining);
+ }
+
+ private OffsetRecordFilter(AtomicLong skipsRemaining) {
+ this.skipsRemaining = skipsRemaining;
+ }
+
+ @Override
+ public boolean isMatch() {
+ // Get current value, and decrement it until zero, in a single atomic
operation.
+ return skipsRemaining.getAndUpdate(l -> l > 0 ? l - 1 : l) == 0;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
index 15c0cd554d..c01bf04eae 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java
@@ -35,11 +35,14 @@ public class AvroParquetHDFSRecordReader implements
HDFSRecordReader {
private GenericRecord lastRecord;
private RecordSchema recordSchema;
private boolean initialized = false;
+ private final Long recordsToRead;
+ private long recordsRead = 0;
private final ParquetReader<GenericRecord> parquetReader;
- public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord>
parquetReader) {
+ public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord>
parquetReader, final Long recordsToRead) {
this.parquetReader = parquetReader;
+ this.recordsToRead = recordsToRead;
}
@Override
@@ -48,7 +51,7 @@ public class AvroParquetHDFSRecordReader implements
HDFSRecordReader {
return null;
}
- lastRecord = parquetReader.read();
+ lastRecord = readNextRecord();
initialized = true;
if (lastRecord == null) {
@@ -63,10 +66,18 @@ public class AvroParquetHDFSRecordReader implements
HDFSRecordReader {
return new MapRecord(recordSchema, values);
}
-
@Override
public void close() throws IOException {
parquetReader.close();
}
+ private GenericRecord readNextRecord() throws IOException {
+ // No more records are available
+ if ((recordsToRead != null) && (recordsRead == recordsToRead)) {
+ return null;
+ }
+ GenericRecord result = parquetReader.read();
+ recordsRead++;
+ return result;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
index 6ff319d6b5..380081a3b0 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
@@ -16,45 +16,79 @@
*/
package org.apache.nifi.parquet.record;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.parquet.filter.OffsetRecordFilter;
import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetReader.Builder;
import org.apache.parquet.io.InputFile;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-
public class ParquetRecordReader implements RecordReader {
private GenericRecord lastParquetRecord;
- private RecordSchema recordSchema;
+ private final RecordSchema recordSchema;
private final InputStream inputStream;
- private final InputFile inputFile;
private final ParquetReader<GenericRecord> parquetReader;
-
- public ParquetRecordReader(final InputStream inputStream, final long
inputLength, final Configuration configuration) throws IOException {
+ private final Long recordsToRead;
+ private long recordsRead = 0;
+
+ public ParquetRecordReader(
+ final InputStream inputStream,
+ final long inputLength,
+ final Configuration configuration,
+ final Map<String, String> variables
+ ) throws IOException {
if (inputLength < 0) {
throw new IllegalArgumentException("Invalid input length of '" +
inputLength + "'. This record reader requires knowing " +
"the length of the InputStream and cannot be used in some
cases where the length may not be known.");
}
+ final Long offset =
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_OFFSET))
+ .map(Long::parseLong)
+ .orElse(null);
+
+ recordsToRead =
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_COUNT))
+ .map(Long::parseLong)
+ .orElse(null);
+
+ final long fileStartOffset =
Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_START_OFFSET))
+ .map(Long::parseLong)
+ .orElse(0L);
+ final long fileEndOffset =
Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_END_OFFSET))
+ .map(Long::parseLong)
+ .orElse(Long.MAX_VALUE);
+
this.inputStream = inputStream;
- inputFile = new NifiParquetInputFile(inputStream, inputLength);
- parquetReader =
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(configuration).build();
+ final InputFile inputFile = new NifiParquetInputFile(inputStream,
inputLength);
+
+ final Builder<GenericRecord> builder =
AvroParquetReader.<GenericRecord>builder(inputFile)
+ .withConf(configuration)
+ .withFileRange(fileStartOffset, fileEndOffset);
+
+ if (offset != null) {
+
builder.withFilter(FilterCompat.get(OffsetRecordFilter.offset(offset)));
+ }
+
+ parquetReader = builder.build();
// Read the first record so that we can extract the schema
- lastParquetRecord = parquetReader.read();
+ lastParquetRecord = readNextRecord();
if (lastParquetRecord == null) {
throw new EOFException("Unable to obtain schema because no records
were available");
}
@@ -75,7 +109,7 @@ public class ParquetRecordReader implements RecordReader {
final Record record = new MapRecord(recordSchema, values);
// Read the next record and store for next time
- lastParquetRecord = parquetReader.read();
+ lastParquetRecord = readNextRecord();
// Return the converted record
return record;
@@ -95,4 +129,14 @@ public class ParquetRecordReader implements RecordReader {
inputStream.close();
}
}
+
+ private GenericRecord readNextRecord() throws IOException {
+ // No more records are available
+ if ((recordsToRead != null) && (recordsRead == recordsToRead)) {
+ return null;
+ }
+ GenericRecord result = parquetReader.read();
+ recordsRead++;
+ return result;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetAttribute.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetAttribute.java
new file mode 100644
index 0000000000..277535cebe
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetAttribute.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.parquet.utils;
+
+public final class ParquetAttribute {
+ public static final String RECORD_OFFSET = "record.offset";
+ public static final String RECORD_COUNT = "record.count";
+ public static final String FILE_RANGE_START_OFFSET =
"parquet.file.range.startOffset";
+ public static final String FILE_RANGE_END_OFFSET =
"parquet.file.range.endOffset";
+
+ private ParquetAttribute() {
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java
new file mode 100644
index 0000000000..5c33c833b8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static java.util.Collections.singletonList;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing",
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+ "The processor generates N flow files from the input, and adds
attributes with the offsets required to read "
+ + "the group of rows in the FlowFile's content. Can be used to
increase the overall efficiency of "
+ + "processing extremely large Parquet files."
+)
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = ParquetAttribute.RECORD_OFFSET,
+ description = "Sets the index of first record of the parquet
file."
+ ),
+ @WritesAttribute(
+ attribute = ParquetAttribute.RECORD_COUNT,
+ description = "Sets the number of records in the parquet file."
+ )
+})
+@ReadsAttributes({
+ @ReadsAttribute(
+ attribute = ParquetAttribute.RECORD_OFFSET,
+ description = "Gets the index of first record in the input."
+ ),
+ @ReadsAttribute(
+ attribute = ParquetAttribute.RECORD_COUNT,
+ description = "Gets the number of records in the input."
+ ),
+ @ReadsAttribute(
+ attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+ description = "Gets the start offset of the selected row group
in the parquet file."
+ ),
+ @ReadsAttribute(
+ attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+ description = "Gets the end offset of the selected row group
in the parquet file."
+ )
+})
+@SideEffectFree
+public class CalculateParquetOffsets extends AbstractProcessor {
+
+ static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new
PropertyDescriptor.Builder()
+ .name("Records Per Split")
+ .description("Specifies how many records should be covered in each
FlowFile")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new
PropertyDescriptor.Builder()
+ .name("Zero Content Output")
+ .description("Whether to do, or do not copy the content of input
FlowFile.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles, with special attributes that represent a
chunk of the input file.")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(
+ PROP_RECORDS_PER_SPLIT,
+ PROP_ZERO_CONTENT_OUTPUT
+ );
+
+ static final Set<Relationship> RELATIONSHIPS = new
HashSet<>(singletonList(REL_SUCCESS));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final FlowFile inputFlowFile = session.get();
+ if (inputFlowFile == null) {
+ return;
+ }
+
+ final long partitionSize =
context.getProperty(PROP_RECORDS_PER_SPLIT).asLong();
+ final boolean zeroContentOutput =
context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
+
+ final long recordOffset =
Optional.ofNullable(inputFlowFile.getAttribute(ParquetAttribute.RECORD_OFFSET))
+ .map(Long::valueOf)
+ .orElse(0L);
+
+ final long recordCount =
Optional.ofNullable(inputFlowFile.getAttribute(ParquetAttribute.RECORD_COUNT))
+ .map(Long::valueOf)
+ .orElseGet(() -> getRecordCount(session, inputFlowFile) -
recordOffset);
+
+ List<FlowFile> partitions = getPartitions(
+ session, inputFlowFile, partitionSize, recordCount,
recordOffset, zeroContentOutput);
+ session.transfer(partitions, REL_SUCCESS);
+ session.adjustCounter("Records Split", recordCount, false);
+ session.adjustCounter("Partitions Created", partitions.size(), false);
+
+ if (zeroContentOutput) {
+ session.remove(inputFlowFile);
+ }
+ }
+
+ private long getRecordCount(ProcessSession session, FlowFile flowFile) {
+ final long fileStartOffset =
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+ .map(Long::parseLong)
+ .orElse(0L);
+ final long fileEndOffset =
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+ .map(Long::parseLong)
+ .orElse(Long.MAX_VALUE);
+
+ final ParquetReadOptions readOptions = ParquetReadOptions.builder()
+ .withRange(fileStartOffset, fileEndOffset)
+ .build();
+ try (
+ InputStream in = session.read(flowFile);
+ ParquetFileReader reader = new ParquetFileReader(
+ new NifiParquetInputFile(in, flowFile.getSize()),
+ readOptions
+ )
+ ) {
+ return reader.getRecordCount();
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+ }
+
+ private List<FlowFile> getPartitions(
+ ProcessSession session,
+ FlowFile inputFlowFile,
+ long partitionSize,
+ long recordCount,
+ long recordOffset,
+ boolean zeroContentOutput
+ ) {
+ final long numberOfPartitions = (recordCount / partitionSize) +
((recordCount % partitionSize) > 0 ? 1 : 0);
+ final List<FlowFile> results = new
ArrayList<>((int)Math.min(Integer.MAX_VALUE, numberOfPartitions));
+
+ for (long currentPartition = 0; currentPartition < numberOfPartitions;
currentPartition++) {
+ long addedOffset = currentPartition * partitionSize;
+ final FlowFile outputFlowFile;
+ if (zeroContentOutput) {
+ outputFlowFile = session.create(inputFlowFile);
+ } else if (currentPartition == 0) {
+ outputFlowFile = inputFlowFile;
+ } else {
+ outputFlowFile = session.clone(inputFlowFile);
+ }
+ results.add(
+ session.putAllAttributes(
+ outputFlowFile,
+ new HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET,
Long.toString(recordOffset + addedOffset));
+ put(ParquetAttribute.RECORD_COUNT,
Long.toString(Math.min(partitionSize, recordCount - addedOffset)));
+ }
+ }
+ )
+ );
+ }
+
+ return results;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsets.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsets.java
new file mode 100644
index 0000000000..39c24eb0be
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsets.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static java.util.Collections.singletonList;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing",
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+ "The processor generates one FlowFile from each Row Group of the
input, and adds attributes with the offsets "
+ + "required to read the group of rows in the FlowFile's
content. Can be used to increase the overall "
+ + "efficiency of processing extremely large Parquet files."
+)
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+ description = "Sets the start offset of the selected row group
in the parquet file."
+ ),
+ @WritesAttribute(
+ attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+ description = "Sets the end offset of the selected row group
in the parquet file."
+ ),
+ @WritesAttribute(
+ attribute = ParquetAttribute.RECORD_COUNT,
+ description = "Sets the count of records in the selected row
group."
+ )
+})
+@SideEffectFree
+public class CalculateParquetRowGroupOffsets extends AbstractProcessor {
+
+ static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new
PropertyDescriptor.Builder()
+ .name("Zero Content Output")
+ .description("Whether to do, or do not copy the content of input
FlowFile.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles, with special attributes that represent a
chunk of the input file.")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
singletonList(PROP_ZERO_CONTENT_OUTPUT);
+
+ static final Set<Relationship> RELATIONSHIPS = new
HashSet<>(singletonList(REL_SUCCESS));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final FlowFile original = session.get();
+ if (original == null) {
+ return;
+ }
+
+ final boolean zeroContentOutput =
context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
+
+ final ParquetMetadata parquetMetadata = getParquetMetadata(session,
original);
+ final List<FlowFile> partitions = getPartitions(session, original,
parquetMetadata.getBlocks(), zeroContentOutput);
+ session.transfer(partitions, REL_SUCCESS);
+ session.adjustCounter("Partitions Created", partitions.size(), false);
+
+ if (zeroContentOutput) {
+ session.remove(original);
+ }
+ }
+
+ private ParquetMetadata getParquetMetadata(ProcessSession session,
FlowFile flowFile) {
+ final ParquetReadOptions readOptions =
ParquetReadOptions.builder().build();
+ try (
+ final InputStream in = session.read(flowFile);
+ ParquetFileReader reader = new ParquetFileReader(
+ new NifiParquetInputFile(in, flowFile.getSize()),
+ readOptions
+ )
+ ) {
+ return reader.getFooter();
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+ }
+
+ private List<FlowFile> getPartitions(
+ ProcessSession session,
+ FlowFile flowFile,
+ List<BlockMetaData> blocks,
+ boolean zeroContentOutput
+ ) {
+ final List<FlowFile> results = new ArrayList<>(blocks.size());
+
+ for (int currentPartition = 0; currentPartition < blocks.size();
currentPartition++) {
+ final BlockMetaData currentBlock = blocks.get(currentPartition);
+ final long currentBlockStartOffset = currentBlock.getStartingPos();
+ final long currentBlockEndOffset = currentBlockStartOffset +
currentBlock.getCompressedSize();
+ final FlowFile outputFlowFile;
+ if (zeroContentOutput) {
+ outputFlowFile = session.create(flowFile);
+ } else if (currentPartition == 0) {
+ outputFlowFile = flowFile;
+ } else {
+ outputFlowFile = session.clone(flowFile);
+ }
+ results.add(
+ session.putAllAttributes(
+ outputFlowFile,
+ new HashMap<String, String>() {
+ {
+
put(ParquetAttribute.FILE_RANGE_START_OFFSET,
String.valueOf(currentBlockStartOffset));
+
put(ParquetAttribute.FILE_RANGE_END_OFFSET,
String.valueOf(currentBlockEndOffset));
+ put(ParquetAttribute.RECORD_COUNT,
String.valueOf(currentBlock.getRowCount()));
+ }
+ }
+ )
+ );
+ }
+
+ return results;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index 6d289da8ba..7aeddcb0a2 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -16,10 +16,15 @@
*/
package org.apache.nifi.processors.parquet;
+import java.io.IOException;
+import java.util.Optional;
+
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -30,13 +35,17 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.filter.OffsetRecordFilter;
+import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordReader;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
-import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordReader;
import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetReader;
-import java.io.IOException;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -51,6 +60,16 @@ import java.io.IOException;
@WritesAttribute(attribute = "record.count", description = "The number
of records in the resulting flow file"),
@WritesAttribute(attribute = "hadoop.file.url", description = "The
hadoop url for the file is stored in this attribute.")
})
+@ReadsAttributes({
+ @ReadsAttribute(
+ attribute = ParquetAttribute.RECORD_OFFSET,
+ description = "Gets the index of first record in the input."
+ ),
+ @ReadsAttribute(
+ attribute = ParquetAttribute.RECORD_COUNT,
+ description = "Gets the number of records in the input."
+ )
+})
@SeeAlso({PutParquet.class})
@Restricted(restrictions = {
@Restriction(
@@ -60,10 +79,32 @@ import java.io.IOException;
public class FetchParquet extends AbstractFetchHDFSRecord {
@Override
- public HDFSRecordReader createHDFSRecordReader(final ProcessContext
context, final FlowFile flowFile, final Configuration conf, final Path path)
- throws IOException {
- final ParquetReader.Builder<GenericRecord> readerBuilder =
AvroParquetReader.<GenericRecord>builder(path).withConf(conf);
- return new AvroParquetHDFSRecordReader(readerBuilder.build());
+ public HDFSRecordReader createHDFSRecordReader(final ProcessContext
context, final FlowFile flowFile, final Configuration conf, final Path path)
throws IOException {
+ final Long offset =
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_OFFSET))
+ .map(Long::parseLong)
+ .orElse(null);
+
+ final Long count =
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_COUNT))
+ .map(Long::parseLong)
+ .orElse(null);
+
+ final long fileStartOffset =
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+ .map(Long::parseLong)
+ .orElse(0L);
+ final long fileEndOffset =
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+ .map(Long::parseLong)
+ .orElse(Long.MAX_VALUE);
+
+ final InputFile inputFile = HadoopInputFile.fromPath(path, conf);
+ final ParquetReader.Builder<GenericRecord> readerBuilder =
AvroParquetReader.<GenericRecord>builder(inputFile)
+ .withConf(conf)
+ .withFileRange(fileStartOffset, fileEndOffset);
+
+ if (offset != null) {
+
readerBuilder.withFilter(FilterCompat.get(OffsetRecordFilter.offset(offset)));
+ }
+
+ return new AvroParquetHDFSRecordReader(readerBuilder.build(), count);
}
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6826d6e74b..0a8a9f3135 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,4 +14,6 @@
# limitations under the License.
org.apache.nifi.processors.parquet.PutParquet
org.apache.nifi.processors.parquet.FetchParquet
+org.apache.nifi.processors.parquet.CalculateParquetOffsets
+org.apache.nifi.processors.parquet.CalculateParquetRowGroupOffsets
org.apache.nifi.processors.parquet.ConvertAvroToParquet
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
new file mode 100644
index 0000000000..3e4c690fa0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.parquet;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+
+public class ParquetTestUtils {
+
+ public static File createUsersParquetFile(int numUsers) throws IOException
{
+ return createUsersParquetFile(IntStream
+ .range(0, numUsers)
+ .mapToObj(ParquetTestUtils::createUser)
+ .collect(toList())
+ );
+ }
+
+ public static Map<String, Object> createUser(int i) {
+ return new HashMap<String, Object>() {
+ {
+ put("name", "Bob" + i);
+ put("favorite_number", i);
+ put("favorite_color", "blue" + i);
+ }
+ };
+ }
+
+ private static File createUsersParquetFile(Collection<Map<String, Object>>
users) throws IOException {
+ final Schema schema = getSchema();
+ final File parquetFile = new
File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
+
+ // write some users to the parquet file...
+ try (final ParquetWriter<GenericRecord> writer =
createParquetWriter(schema, parquetFile)) {
+ users.forEach(user -> {
+ final GenericRecord record = new GenericData.Record(schema);
+ user.forEach(record::put);
+ try {
+ writer.write(record);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ return parquetFile;
+ }
+
+ private static Schema getSchema() throws IOException {
+ try (InputStream schemaInputStream =
ParquetTestUtils.class.getClassLoader().getResourceAsStream("avro/user.avsc")) {
+ assert schemaInputStream != null;
+ final String schemaString = IOUtils.toString(schemaInputStream,
StandardCharsets.UTF_8);
+ return new Schema.Parser().parse(schemaString);
+ }
+ }
+
+ private static ParquetWriter<GenericRecord> createParquetWriter(final
Schema schema, final File parquetFile) throws IOException {
+ final Configuration conf = new Configuration();
+ final Path parquetPath = new Path(parquetFile.getPath());
+
+ return
AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath,
conf))
+ .withSchema(schema)
+ .withConf(conf)
+ .withRowGroupSize(8192L)
+ .build();
+ }
+
+ private ParquetTestUtils() {}
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
index 21a81dcfbb..abb03876d7 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java
@@ -16,7 +16,13 @@
*/
package org.apache.nifi.parquet;
-import org.apache.commons.io.IOUtils;
+import static java.util.Collections.singletonList;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@@ -24,20 +30,11 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StringUtils;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
public class TestParquetProcessor extends AbstractProcessor {
public static final PropertyDescriptor READER = new
PropertyDescriptor.Builder()
@@ -47,13 +44,6 @@ public class TestParquetProcessor extends AbstractProcessor {
.required(true)
.build();
- public static final PropertyDescriptor PATH = new
PropertyDescriptor.Builder()
- .name("path")
- .description("path")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("success")
@@ -66,16 +56,7 @@ public class TestParquetProcessor extends AbstractProcessor {
final List<String> records = new ArrayList<>();
- byte[] parquetBytes;
-
- // read the parquet file into bytes since we can't use a
FileInputStream since it doesn't support mark/reset
- try {
- parquetBytes = IOUtils.toByteArray(new
File(context.getProperty(PATH).getValue()).toURI());
- } catch (Exception e) {
- throw new ProcessException(e);
- }
-
- try (final InputStream in = new ByteArrayInputStream(parquetBytes);
+ try (final InputStream in = session.read(flowFile);
final RecordReader reader =
readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
@@ -92,12 +73,12 @@ public class TestParquetProcessor extends AbstractProcessor
{
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return new ArrayList<PropertyDescriptor>() {{ add(READER); add(PATH);
}};
+ return singletonList(READER);
}
@Override
public Set<Relationship> getRelationships() {
- return new HashSet<Relationship>() {{ add(SUCCESS); }};
+ return new HashSet<>(singletonList(SUCCESS));
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index 0fccd7de6c..f8d2929ee7 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -16,48 +16,43 @@
*/
package org.apache.nifi.parquet;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
@DisabledOnOs({ OS.WINDOWS })
public class TestParquetReader {
private static final String PARQUET_PATH =
"src/test/resources/TestParquetReader.parquet";
- private static final String SCHEMA_PATH =
"src/test/resources/avro/user.avsc";
private ParquetReader parquetReaderFactory;
private ComponentLog componentLog;
@@ -75,70 +70,241 @@ public class TestParquetReader {
@Test
public void testReadUsers() throws IOException, MalformedRecordException {
- final Schema schema = getSchema();
- final File parquetFile = new
File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
-
- // write some users to the parquet file...
final int numUsers = 10;
- try (final ParquetWriter<GenericRecord> writer =
createParquetWriter(schema, parquetFile)) {
- for (int i=0; i < numUsers; i++) {
- final GenericRecord user = new GenericData.Record(schema);
- user.put("name", "Bob" + i);
- user.put("favorite_number", i);
- user.put("favorite_color", "blue" + i);
- writer.write(user);
- }
- }
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(parquetFile, emptyMap());
- // read the parquet file into bytes since we can't use a
FileInputStream since it doesn't support mark/reset
- final byte[] parquetBytes = IOUtils.toByteArray(parquetFile.toURI());
+ assertEquals(numUsers, results.size());
+ IntStream.range(0, numUsers)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i),
convertRecordToUser(results.get(i))));
+ }
- // read the users in using the record reader...
- try (final InputStream in = new ByteArrayInputStream(parquetBytes);
- final RecordReader recordReader =
parquetReaderFactory.createRecordReader(
- Collections.emptyMap(), in, parquetFile.length(),
componentLog)) {
+ @Test
+ public void testReadUsersPartiallyWithLimitedRecordCount() throws
IOException, MalformedRecordException {
+ final int numUsers = 25;
+ final int expectedRecords = 3;
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(parquetFile,
singletonMap(ParquetAttribute.RECORD_COUNT, "3"));
+
+ assertEquals(expectedRecords, results.size());
+ IntStream.range(0, expectedRecords)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i),
convertRecordToUser(results.get(i))));
+ }
+
+ @Test
+ public void testReadUsersPartiallyWithOffset() throws IOException,
MalformedRecordException {
+ final int numUsers = 1000025; // intentionally so large, to test input
with many record groups
+ final int expectedRecords = 5;
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(parquetFile,
singletonMap(ParquetAttribute.RECORD_OFFSET, "1000020"));
+
+ assertEquals(expectedRecords, results.size());
+ IntStream.range(0, expectedRecords)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
1000020), convertRecordToUser(results.get(i))));
+ }
- int recordCount = 0;
- while (recordReader.nextRecord() != null) {
- recordCount++;
+ @Test
+ public void testReadUsersPartiallyWithOffsetAndLimitedRecordCount() throws
IOException, MalformedRecordException {
+ final int numUsers = 1000025; // intentionally so large, to test input
with many record groups
+ final int expectedRecords = 2;
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(parquetFile, new
HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET, "1000020");
+ put(ParquetAttribute.RECORD_COUNT, "2");
}
- assertEquals(numUsers, recordCount);
- }
+ });
+
+ assertEquals(expectedRecords, results.size());
+ IntStream.range(0, expectedRecords)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
1000020), convertRecordToUser(results.get(i))));
+ }
+
+ @Test
+ public void testReadUsersPartiallyWithLimitedRecordCountWithinFileRange()
+ throws IOException, MalformedRecordException {
+ final int numUsers = 1000;
+ final int expectedRecords = 3;
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(
+ parquetFile,
+ new HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_COUNT, "3");
+ put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+ put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+ }
+ }
+ );
+
+ assertEquals(expectedRecords, results.size());
+ IntStream.range(0, expectedRecords)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
663), convertRecordToUser(results.get(i))));
+ }
+
+ @Test
+ public void testReadUsersPartiallyWithOffsetWithinFileRange() throws
IOException, MalformedRecordException {
+ final int numUsers = 1000;
+ final int expectedRecords = 5;
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(
+ parquetFile,
+ new HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET, "321");
+ put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+ put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+ }
+ }
+ );
+
+ assertEquals(expectedRecords, results.size());
+ IntStream.range(0, expectedRecords)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
984), convertRecordToUser(results.get(i))));
+ }
+
+ @Test
+ public void
testReadUsersPartiallyWithOffsetAndLimitedRecordCountWithinFileRange()
+ throws IOException, MalformedRecordException {
+ final int numUsers = 1000;
+ final int expectedRecords = 2;
+ final File parquetFile =
ParquetTestUtils.createUsersParquetFile(numUsers);
+ final List<Record> results = getRecords(
+ parquetFile,
+ new HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET, "321");
+ put(ParquetAttribute.RECORD_COUNT, "2");
+ put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+ put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+ }
+ }
+ );
+
+ assertEquals(expectedRecords, results.size());
+ IntStream.range(0, expectedRecords)
+ .forEach(i -> assertEquals(ParquetTestUtils.createUser(i +
984), convertRecordToUser(results.get(i))));
}
@Test
public void testReader() throws InitializationException, IOException {
final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
+ final ParquetReader parquetReader = new ParquetReader();
+
+ runner.addControllerService("reader", parquetReader);
+ runner.enableControllerService(parquetReader);
+ runner.enqueue(Paths.get(PARQUET_PATH));
+
+ runner.setProperty(TestParquetProcessor.READER, "reader");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+ "MapRecord[{name=Bob0, favorite_number=0,
favorite_color=blue0}]\n" +
+ "MapRecord[{name=Bob1, favorite_number=1,
favorite_color=blue1}]\n" +
+ "MapRecord[{name=Bob2, favorite_number=2,
favorite_color=blue2}]\n" +
+ "MapRecord[{name=Bob3, favorite_number=3,
favorite_color=blue3}]\n" +
+ "MapRecord[{name=Bob4, favorite_number=4,
favorite_color=blue4}]\n" +
+ "MapRecord[{name=Bob5, favorite_number=5,
favorite_color=blue5}]\n" +
+ "MapRecord[{name=Bob6, favorite_number=6,
favorite_color=blue6}]\n" +
+ "MapRecord[{name=Bob7, favorite_number=7,
favorite_color=blue7}]\n" +
+ "MapRecord[{name=Bob8, favorite_number=8,
favorite_color=blue8}]\n" +
+ "MapRecord[{name=Bob9, favorite_number=9,
favorite_color=blue9}]");
+ }
+
+ @Test
+ public void testPartialReaderWithLimitedRecordCount() throws
InitializationException, IOException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
final ParquetReader parquetReader = new ParquetReader();
runner.addControllerService("reader", parquetReader);
runner.enableControllerService(parquetReader);
- runner.enqueue(Paths.get(PARQUET_PATH));
+ runner.enqueue(Paths.get(PARQUET_PATH),
singletonMap(ParquetAttribute.RECORD_COUNT, "2"));
runner.setProperty(TestParquetProcessor.READER, "reader");
- runner.setProperty(TestParquetProcessor.PATH, PARQUET_PATH);
runner.run();
runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+ "MapRecord[{name=Bob0, favorite_number=0,
favorite_color=blue0}]\n" +
+ "MapRecord[{name=Bob1, favorite_number=1,
favorite_color=blue1}]");
}
+ @Test
+ public void testPartialReaderWithOffsetAndLimitedRecordCount() throws
InitializationException, IOException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
+ final ParquetReader parquetReader = new ParquetReader();
+
+ runner.addControllerService("reader", parquetReader);
+ runner.enableControllerService(parquetReader);
+
+ runner.enqueue(Paths.get(PARQUET_PATH), new HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET, "6");
+ put(ParquetAttribute.RECORD_COUNT, "2");
+ }
+ });
+
+ runner.setProperty(TestParquetProcessor.READER, "reader");
- private Schema getSchema() throws IOException {
- final File schemaFile = new File(SCHEMA_PATH);
- final String schemaString = IOUtils.toString(new
FileInputStream(schemaFile), StandardCharsets.UTF_8);
- return new Schema.Parser().parse(schemaString);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+ "MapRecord[{name=Bob6, favorite_number=6,
favorite_color=blue6}]\n" +
+ "MapRecord[{name=Bob7, favorite_number=7,
favorite_color=blue7}]");
+ }
+
+ @Test
+ public void testPartialReaderWithOffsetOnly() throws
InitializationException, IOException {
+ final TestRunner runner =
TestRunners.newTestRunner(TestParquetProcessor.class);
+ final ParquetReader parquetReader = new ParquetReader();
+
+ runner.addControllerService("reader", parquetReader);
+ runner.enableControllerService(parquetReader);
+
+ runner.enqueue(Paths.get(PARQUET_PATH),
singletonMap(ParquetAttribute.RECORD_OFFSET, "3"));
+
+ runner.setProperty(TestParquetProcessor.READER, "reader");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
+ "MapRecord[{name=Bob3, favorite_number=3,
favorite_color=blue3}]\n" +
+ "MapRecord[{name=Bob4, favorite_number=4,
favorite_color=blue4}]\n" +
+ "MapRecord[{name=Bob5, favorite_number=5,
favorite_color=blue5}]\n" +
+ "MapRecord[{name=Bob6, favorite_number=6,
favorite_color=blue6}]\n" +
+ "MapRecord[{name=Bob7, favorite_number=7,
favorite_color=blue7}]\n" +
+ "MapRecord[{name=Bob8, favorite_number=8,
favorite_color=blue8}]\n" +
+ "MapRecord[{name=Bob9, favorite_number=9,
favorite_color=blue9}]");
}
- private ParquetWriter<GenericRecord> createParquetWriter(final Schema
schema, final File parquetFile) throws IOException {
- final Configuration conf = new Configuration();
- final Path parquetPath = new Path(parquetFile.getPath());
+ private List<Record> getRecords(File parquetFile, Map<String, String>
variables)
+ throws IOException, MalformedRecordException {
+ final List<Record> results = new ArrayList<>();
+ // read the parquet file into bytes since we can't use a
FileInputStream since it doesn't support mark/reset
+ final byte[] parquetBytes = IOUtils.toByteArray(parquetFile.toURI());
+
+ // read the users in using the record reader...
+ try (final InputStream in = new ByteArrayInputStream(parquetBytes);
+ final RecordReader recordReader =
parquetReaderFactory.createRecordReader(
+ variables, in, parquetFile.length(), componentLog)) {
+
+ Record record;
+ while ((record = recordReader.nextRecord()) != null) {
+ results.add(record);
+ }
+ }
+ return results;
+ }
- return
AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath,
conf))
- .withSchema(schema)
- .withConf(conf)
- .build();
+ private Map<String, Object> convertRecordToUser(Record record) {
+ return record.getRawFieldNames()
+ .stream()
+ .collect(toMap(
+ fieldName -> fieldName,
+ record::getValue
+ ));
}
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetOffsetsTest.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetOffsetsTest.java
new file mode 100644
index 0000000000..9811d03762
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetOffsetsTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static java.util.Collections.singletonMap;
+import static
org.apache.nifi.processors.parquet.CalculateParquetOffsets.PROP_RECORDS_PER_SPLIT;
+import static
org.apache.nifi.processors.parquet.CalculateParquetOffsets.PROP_ZERO_CONTENT_OUTPUT;
+import static
org.apache.nifi.processors.parquet.CalculateParquetOffsets.REL_SUCCESS;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class CalculateParquetOffsetsTest {
+
+ private static final Path PARQUET_PATH =
Paths.get("src/test/resources/TestParquetReader.parquet");
+ private static final Path NOT_PARQUET_PATH =
Paths.get("src/test/resources/core-site.xml");
+
+ private static final Map<String, String> PRESERVED_ATTRIBUTES = new
HashMap<String, String>() {
+ {
+ put("foo", "bar");
+ put("example", "value");
+ }
+ };
+
+ private TestRunner runner;
+
+ @BeforeEach
+ public void setUp() {
+ runner = TestRunners.newTestRunner(new CalculateParquetOffsets());
+ }
+
+ @Test
+ public void testSinglePartition() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "10");
+ runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"10");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals(PARQUET_PATH);
+
+ PRESERVED_ATTRIBUTES.forEach(results.get(0)::assertAttributeEquals);
+ }
+
+ @Test
+ public void testEachGoesSeparatePartition() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "1");
+ runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 10);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ for (int i = 0; i < 10; i++) {
+
results.get(i).assertAttributeEquals(ParquetAttribute.RECORD_COUNT, "1");
+
results.get(i).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
String.valueOf(i));
+ results.get(i).assertContentEquals(PARQUET_PATH);
+ }
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testHalfPartitions() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "5");
+ runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"5");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals(PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"5");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"5");
+ results.get(1).assertContentEquals(PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testAsymmetricPartitions() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+ runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"8");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals(PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"2");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"8");
+ results.get(1).assertContentEquals(PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testSubPartitioningWithCountAndOffset() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+ runner.enqueue(PARQUET_PATH, createAttributes(new HashMap<String,
String>() {
+ {
+ put(ParquetAttribute.RECORD_COUNT, "7");
+ put(ParquetAttribute.RECORD_OFFSET, "2");
+ }
+ }));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 3);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"3");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"2");
+ results.get(0).assertContentEquals(PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"3");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"5");
+ results.get(1).assertContentEquals(PARQUET_PATH);
+
+ results.get(2).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"1");
+ results.get(2).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"8");
+ results.get(2).assertContentEquals(PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testSubPartitioningWithoutOffset() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "2");
+ runner.enqueue(PARQUET_PATH,
createAttributes(singletonMap(ParquetAttribute.RECORD_COUNT, "3")));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"2");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals(PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"1");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"2");
+ results.get(1).assertContentEquals(PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testSubPartitioningWithoutCount() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "5");
+ runner.enqueue(PARQUET_PATH,
createAttributes(singletonMap(ParquetAttribute.RECORD_OFFSET, "3")));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"5");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"3");
+ results.get(0).assertContentEquals(PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"2");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"8");
+ results.get(1).assertContentEquals(PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testZeroContentOutput() throws Exception {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+ runner.setProperty(PROP_ZERO_CONTENT_OUTPUT, "true");
+ runner.enqueue(PARQUET_PATH, PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"8");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals("");
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"2");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"8");
+ results.get(1).assertContentEquals("");
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testEmptyInput() {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+ runner.enqueue("", PRESERVED_ATTRIBUTES);
+
+ final Throwable thrownException = assertThrows(Throwable.class, () ->
runner.run());
+ assertTrue(thrownException.getMessage().contains("is not a Parquet
file"));
+ }
+
+ @Test
+ public void testEmptyInputWithOffsetAttribute() {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+ runner.enqueue("",
createAttributes(singletonMap(ParquetAttribute.RECORD_OFFSET, "4")));
+
+ final Throwable thrownException = assertThrows(Throwable.class, () ->
runner.run());
+ assertTrue(thrownException.getMessage().contains("is not a Parquet
file"));
+ }
+
+ @Test
+ public void testEmptyInputWithCountAttribute() {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+ runner.enqueue("",
createAttributes(singletonMap(ParquetAttribute.RECORD_COUNT, "4")));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"3");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals("");
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"1");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"3");
+ results.get(1).assertContentEquals("");
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testEmptyInputWithOffsetAndCountAttributes() {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+ runner.enqueue("", createAttributes(new HashMap<String, String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET, "2");
+ put(ParquetAttribute.RECORD_COUNT, "4");
+ }
+ }));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"3");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"2");
+ results.get(0).assertContentEquals("");
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"1");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"5");
+ results.get(1).assertContentEquals("");
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testUnrecognizedInput() throws IOException {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+ runner.enqueue(NOT_PARQUET_PATH, PRESERVED_ATTRIBUTES);
+
+ final Throwable thrownException = assertThrows(Throwable.class, () ->
runner.run());
+ assertTrue(thrownException.getMessage().contains("is not a Parquet
file"));
+ }
+
+ @Test
+ public void testUnrecognizedInputWithOffsetAttribute() throws IOException {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "8");
+ runner.enqueue(NOT_PARQUET_PATH,
createAttributes(singletonMap(ParquetAttribute.RECORD_OFFSET, "4")));
+
+ final Throwable thrownException = assertThrows(Throwable.class, () ->
runner.run());
+ assertTrue(thrownException.getMessage().contains("is not a Parquet
file"));
+ }
+
+ @Test
+ public void testUnrecognizedInputWithCountAttribute() throws IOException {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+ runner.enqueue(NOT_PARQUET_PATH,
createAttributes(singletonMap(ParquetAttribute.RECORD_COUNT, "4")));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"3");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"0");
+ results.get(0).assertContentEquals(NOT_PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"1");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"3");
+ results.get(1).assertContentEquals(NOT_PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testUnrecognizedInputWithOffsetAndCountAttributes() throws
IOException {
+ runner.setProperty(PROP_RECORDS_PER_SPLIT, "3");
+ runner.enqueue(NOT_PARQUET_PATH, createAttributes(new HashMap<String,
String>() {
+ {
+ put(ParquetAttribute.RECORD_OFFSET, "2");
+ put(ParquetAttribute.RECORD_COUNT, "4");
+ }
+ }));
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"3");
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"2");
+ results.get(0).assertContentEquals(NOT_PARQUET_PATH);
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"1");
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_OFFSET,
"5");
+ results.get(1).assertContentEquals(NOT_PARQUET_PATH);
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ private HashMap<String, String> createAttributes(Map<String, String>
additionalAttributes) {
+ return new HashMap<String, String>(PRESERVED_ATTRIBUTES) {{
+ putAll(additionalAttributes);
+ }};
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsetsTest.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsetsTest.java
new file mode 100644
index 0000000000..dfd4dbacf4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/CalculateParquetRowGroupOffsetsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import static
org.apache.nifi.processors.parquet.CalculateParquetOffsets.PROP_ZERO_CONTENT_OUTPUT;
+import static
org.apache.nifi.processors.parquet.CalculateParquetOffsets.REL_SUCCESS;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.parquet.ParquetTestUtils;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+@DisabledOnOs({ OS.WINDOWS })
+public class CalculateParquetRowGroupOffsetsTest {
+
+ private static final Path NOT_PARQUET_PATH =
Paths.get("src/test/resources/core-site.xml");
+
+ private static final Map<String, String> PRESERVED_ATTRIBUTES = new
HashMap<String, String>() {
+ {
+ put("foo", "bar");
+ put("example", "value");
+ }
+ };
+
+ private TestRunner runner;
+
+ @BeforeEach
+ public void setUp() {
+ runner = TestRunners.newTestRunner(new
CalculateParquetRowGroupOffsets());
+ }
+
+ @Test
+ public void testSinglePartition() throws Exception {
+ File parquetFile = ParquetTestUtils.createUsersParquetFile(10);
+ runner.enqueue(parquetFile.toPath(), PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"10");
+
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"4");
+
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"298");
+ results.get(0).assertContentEquals(parquetFile.toPath());
+
+ PRESERVED_ATTRIBUTES.forEach(results.get(0)::assertAttributeEquals);
+ }
+
+ @Test
+ public void testEachRowGroupGoesToSeparatePartition() throws Exception {
+ File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+ runner.enqueue(parquetFile.toPath(), PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"337");
+
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"4");
+
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"8301");
+ results.get(0).assertContentEquals(parquetFile.toPath());
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"326");
+
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"8301");
+
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"16543");
+ results.get(1).assertContentEquals(parquetFile.toPath());
+
+ results.get(2).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"326");
+
results.get(2).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"16543");
+
results.get(2).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"24784");
+ results.get(2).assertContentEquals(parquetFile.toPath());
+
+ results.get(3).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"11");
+
results.get(3).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"24784");
+
results.get(3).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"25143");
+ results.get(3).assertContentEquals(parquetFile.toPath());
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testZeroContentOutput() throws Exception {
+ File parquetFile = ParquetTestUtils.createUsersParquetFile(500);
+ runner.setProperty(PROP_ZERO_CONTENT_OUTPUT, "true");
+ runner.enqueue(parquetFile.toPath(), PRESERVED_ATTRIBUTES);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ final List<MockFlowFile> results =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ results.get(0).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"337");
+
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"4");
+
results.get(0).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"8301");
+ results.get(0).assertContentEquals("");
+
+ results.get(1).assertAttributeEquals(ParquetAttribute.RECORD_COUNT,
"163");
+
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_START_OFFSET,
"8301");
+
results.get(1).assertAttributeEquals(ParquetAttribute.FILE_RANGE_END_OFFSET,
"12468");
+ results.get(1).assertContentEquals("");
+
+ results.forEach(flowFile ->
PRESERVED_ATTRIBUTES.forEach(flowFile::assertAttributeEquals));
+ }
+
+ @Test
+ public void testEmptyInput() {
+ runner.enqueue("", PRESERVED_ATTRIBUTES);
+
+ final Throwable thrownException = assertThrows(Throwable.class, () ->
runner.run());
+ assertTrue(thrownException.getMessage().contains("is not a Parquet
file"));
+ }
+
+ @Test
+ public void testUnrecognizedInput() throws IOException {
+ runner.enqueue(NOT_PARQUET_PATH, PRESERVED_ATTRIBUTES);
+
+ final Throwable thrownException = assertThrows(Throwable.class, () ->
runner.run());
+ assertTrue(thrownException.getMessage().contains("is not a Parquet
file"));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index df412c8fe2..5ee5f4e37f 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -16,6 +16,23 @@
*/
package org.apache.nifi.processors.parquet;
+import static
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -26,6 +43,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parquet.ParquetTestUtils;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.reporting.InitializationException;
@@ -48,24 +67,6 @@ import org.junit.jupiter.api.condition.OS;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.when;
-
@DisabledOnOs({ OS.WINDOWS })
public class FetchParquetTest {
@@ -141,6 +142,185 @@ public class FetchParquetTest {
verifyCSVRecords(flowFileContent);
}
+ @Test
+ public void testFetchParquetToCSVWithOffsetAndCount() throws IOException,
InitializationException {
+ configure(proc);
+
+ final File parquetDir = new File(DIRECTORY);
+ final File parquetFile = new
File(parquetDir,"testFetchParquetToCSV.parquet");
+ writeParquetUsers(parquetFile);
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.PATH.key(),
parquetDir.getAbsolutePath());
+ attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+ attributes.put(ParquetAttribute.RECORD_OFFSET, "3");
+ attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+ testRunner.enqueue("TRIGGER", attributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
+
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
+ "/" + parquetFile.getName()));
+
+ // the mock record writer will write the header for each record so
replace those to get down to just the records
+ String flowFileContent = new String(flowFile.toByteArray(),
StandardCharsets.UTF_8);
+ flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+ assertEquals("Bob3,3,blue3\n", flowFileContent);
+ }
+
+ @Test
+ public void testFetchParquetToCSVWithOffset() throws IOException,
InitializationException {
+ configure(proc);
+
+ final File parquetDir = new File(DIRECTORY);
+ final File parquetFile = new
File(parquetDir,"testFetchParquetToCSV.parquet");
+ writeParquetUsers(parquetFile);
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.PATH.key(),
parquetDir.getAbsolutePath());
+ attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+ attributes.put(ParquetAttribute.RECORD_OFFSET, "9");
+
+ testRunner.enqueue("TRIGGER", attributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
+
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
+ "/" + parquetFile.getName()));
+
+ // the mock record writer will write the header for each record so
replace those to get down to just the records
+ String flowFileContent = new String(flowFile.toByteArray(),
StandardCharsets.UTF_8);
+ flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+ assertEquals("Bob9,9,blue9\n", flowFileContent);
+ }
+
+ @Test
+ public void testFetchParquetToCSVWithCount() throws IOException,
InitializationException {
+ configure(proc);
+
+ final File parquetDir = new File(DIRECTORY);
+ final File parquetFile = new
File(parquetDir,"testFetchParquetToCSV.parquet");
+ writeParquetUsers(parquetFile);
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.PATH.key(),
parquetDir.getAbsolutePath());
+ attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+ attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+ testRunner.enqueue("TRIGGER", attributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
+
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
+ "/" + parquetFile.getName()));
+
+ // the mock record writer will write the header for each record so
replace those to get down to just the records
+ String flowFileContent = new String(flowFile.toByteArray(),
StandardCharsets.UTF_8);
+ flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+ assertEquals("Bob0,0,blue0\n", flowFileContent);
+ }
+
+ @Test
+ public void testFetchParquetToCSVWithOffsetWithinFileRange() throws
IOException, InitializationException {
+ configure(proc);
+
+ final File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+ final File parquetDir = parquetFile.getParentFile();
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.PATH.key(),
parquetDir.getAbsolutePath());
+ attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+ attributes.put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+ attributes.put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+ attributes.put(ParquetAttribute.RECORD_OFFSET, "325");
+
+ testRunner.enqueue("TRIGGER", attributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
+
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
+ "/" + parquetFile.getName()));
+
+ // the mock record writer will write the header for each record so
replace those to get down to just the records
+ String flowFileContent = new String(flowFile.toByteArray(),
StandardCharsets.UTF_8);
+ flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+ assertEquals("Bob988,988,blue988\n", flowFileContent);
+ }
+
+ @Test
+ public void testFetchParquetToCSVWithCountWithinFileRange() throws
IOException, InitializationException {
+ configure(proc);
+
+ final File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+ final File parquetDir = parquetFile.getParentFile();
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.PATH.key(),
parquetDir.getAbsolutePath());
+ attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+ attributes.put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+ attributes.put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+ attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+ testRunner.enqueue("TRIGGER", attributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
+
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
+ "/" + parquetFile.getName()));
+
+ // the mock record writer will write the header for each record so
replace those to get down to just the records
+ String flowFileContent = new String(flowFile.toByteArray(),
StandardCharsets.UTF_8);
+ flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+ assertEquals("Bob663,663,blue663\n", flowFileContent);
+ }
+
+ @Test
+ public void testFetchParquetToCSVWithOffsetAndCountWithinFileRange()
throws IOException, InitializationException {
+ configure(proc);
+
+ final File parquetFile = ParquetTestUtils.createUsersParquetFile(1000);
+ final File parquetDir = parquetFile.getParentFile();
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.PATH.key(),
parquetDir.getAbsolutePath());
+ attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+ attributes.put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
+ attributes.put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
+ attributes.put(ParquetAttribute.RECORD_OFFSET, "3");
+ attributes.put(ParquetAttribute.RECORD_COUNT, "1");
+
+ testRunner.enqueue("TRIGGER", attributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, "1");
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),
"text/plain");
+
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
+ "/" + parquetFile.getName()));
+
+ // the mock record writer will write the header for each record so
replace those to get down to just the records
+ String flowFileContent = new String(flowFile.toByteArray(),
StandardCharsets.UTF_8);
+ flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+ assertEquals("Bob666,666,blue666\n", flowFileContent);
+ }
+
@Test
public void testFetchWhenELEvaluatesToEmptyShouldRouteFailure() throws
InitializationException {
configure(proc);