takraj commented on code in PR #7893: URL: https://github.com/apache/nifi/pull/7893#discussion_r1402130881
########## nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.parquet.stream.NifiParquetInputFile; +import org.apache.nifi.parquet.utils.ParquetAttribute; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; + +@Tags({"parquet", "split", "partition", "break apart", "efficient processing", "load balance", "cluster"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription( + "The processor generates N flow files from the input, and adds attributes with the offsets required to read " + + "the group of rows in the FlowFile's content. Can be used to increase the overall efficiency of " + + "processing extremely large Parquet files." +) +@UseCase( + description = "Multithreaded processing of extremely large Parquet files", + keywords = {"multi-threaded", "multi-core", "parallel"}, + configuration = """ + Instead of having something like + + X -> ConvertRecord (Parquet / JSON) -> ... + + Have something like + + X -> CalculateParquetOffsets -> ConvertRecord (Parquet / JSON) -> ... + + This increases the overall efficiency of this operation for extremely large + Parquet files (hundreds of GBs). With the second approach, you can leverage + multi-threading, for processing a single file. + """ +) +@UseCase( + description = "Distributed processing of extremely large Parquet files across the NiFi nodes", + keywords = {"distribute", "nodes", "cluster", "transfer", "parallel"}, + configuration = """ + Instead of having something like + + X -> ConvertRecord (Parquet / JSON) -> ... + + Have something like + + X -> CalculateParquetOffsets -> FetchParquet (JSON Writer) -> ... + + And set "Zero Content Output" to "true". + + This way, a load balanced connection could be used between CalculateParquetOffsets and FetchParquet, + in order to distribute the work across the nodes, without transferring a lot of data across + the nodes of the cluster. + """ +) +@WritesAttributes({ + @WritesAttribute( + attribute = ParquetAttribute.RECORD_OFFSET, + description = "Sets the index of first record of the parquet file." + ), + @WritesAttribute( + attribute = ParquetAttribute.RECORD_COUNT, + description = "Sets the number of records in the parquet file." + ) +}) +@ReadsAttributes({ + @ReadsAttribute( + attribute = ParquetAttribute.RECORD_OFFSET, + description = "Gets the index of first record in the input." + ), + @ReadsAttribute( + attribute = ParquetAttribute.RECORD_COUNT, + description = "Gets the number of records in the input." + ), + @ReadsAttribute( + attribute = ParquetAttribute.FILE_RANGE_START_OFFSET, + description = "Gets the start offset of the selected row group in the parquet file." + ), + @ReadsAttribute( + attribute = ParquetAttribute.FILE_RANGE_END_OFFSET, + description = "Gets the end offset of the selected row group in the parquet file." + ) +}) +@SideEffectFree +public class CalculateParquetOffsets extends AbstractProcessor { + + static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new PropertyDescriptor.Builder() + .name("Records Per Split") + .description("Specifies how many records should be covered in each FlowFile") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new PropertyDescriptor.Builder() + .name("Zero Content Output") + .description("Whether to do, or do not copy the content of input FlowFile.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles, with special attributes that represent a chunk of the input file.") + .build(); + + static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of( + PROP_RECORDS_PER_SPLIT, + PROP_ZERO_CONTENT_OUTPUT + ); + + static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { Review Comment: Done in 9f340c9 ########## nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java: ########## @@ -16,45 +16,79 @@ */ package org.apache.nifi.parquet.record; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Optional; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.parquet.filter.OffsetRecordFilter; import org.apache.nifi.parquet.stream.NifiParquetInputFile; +import org.apache.nifi.parquet.utils.ParquetAttribute; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetReader.Builder; import org.apache.parquet.io.InputFile; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; - public class ParquetRecordReader implements RecordReader { private GenericRecord lastParquetRecord; - private RecordSchema recordSchema; + private final RecordSchema recordSchema; private final InputStream inputStream; - private final InputFile inputFile; private final ParquetReader<GenericRecord> parquetReader; - - public ParquetRecordReader(final InputStream inputStream, final long inputLength, final Configuration configuration) throws IOException { + private final Long count; Review Comment: Done in 9f340c9 ########## nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java: ########## @@ -35,11 +35,18 @@ public class AvroParquetHDFSRecordReader implements HDFSRecordReader { private GenericRecord lastRecord; private RecordSchema recordSchema; private boolean initialized = false; + private final Long count; + private long recordsRead = 0; private final ParquetReader<GenericRecord> parquetReader; public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader) { + this(parquetReader, null); + } + + public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader, Long count) { Review Comment: Done in 9f340c9 ########## nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java: ########## @@ -63,9 +79,35 @@ public class FetchParquet extends AbstractFetchHDFSRecord { @Override public HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path) throws IOException { + final Long offset = Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_OFFSET)) + .map(Long::parseLong) + .orElse(null); + + final Long count = Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_COUNT)) + .map(Long::parseLong) + .orElse(null); + + final long fileStartOffset = Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET)) + .map(Long::parseLong) + .orElse(0L); + final long fileEndOffset = Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET)) + .map(Long::parseLong) + .orElse(Long.MAX_VALUE); + final InputFile inputFile = HadoopInputFile.fromPath(path, conf); - final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader.<GenericRecord>builder(inputFile).withConf(conf); - return new AvroParquetHDFSRecordReader(readerBuilder.build()); + final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader.<GenericRecord>builder(inputFile) + .withConf(conf) + .withFileRange(fileStartOffset, fileEndOffset); + + if (offset != null) { + readerBuilder.withFilter(FilterCompat.get(OffsetRecordFilter.offset(offset))); + } + + if (count == null) { + return new AvroParquetHDFSRecordReader(readerBuilder.build()); + } else { + return new AvroParquetHDFSRecordReader(readerBuilder.build(), count); + } Review Comment: Done in 9f340c9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
