[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653906#comment-16653906
 ] 

ASF GitHub Bot commented on NIFI-5706:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3079#discussion_r226019319
  
    --- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.parquet;
    +
    +
    +import com.google.common.collect.ImmutableSet;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
    +import org.apache.parquet.avro.AvroParquetWriter;
    +import org.apache.parquet.avro.AvroReadSupport;
    +import org.apache.parquet.column.ParquetProperties;
    +import org.apache.parquet.hadoop.ParquetFileWriter;
    +import org.apache.parquet.hadoop.ParquetWriter;
    +import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +@Tags({"avro", "parquet", "convert"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
    +        + "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
    +        + "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
    +        @WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
    +})
    +public class ConvertAvroToParquet extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String FILE_NAME_ATTRIBUTE = "file.name";
    +    public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
    +
    +    public static final List<AllowableValue> COMPRESSION_TYPES;
    +    static {
    +        final List<AllowableValue> compressionTypes = new ArrayList<>();
    +        for (CompressionCodecName compressionCodecName : 
CompressionCodecName.values()) {
    +            final String name = compressionCodecName.name();
    +            compressionTypes.add(new AllowableValue(name, name));
    +        }
    +        COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
    +    }
    +
    +
    +    // Parquet properties
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being 
written.")
    +            .allowableValues(COMPRESSION_TYPES.toArray(new 
AllowableValue[0]))
    +            .defaultValue(COMPRESSION_TYPES.get(0).getValue())
    +            .required(true)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor ROW_GROUP_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("row-group-size")
    +            .displayName("Row Group Size")
    +            .description("The row group size used by the Parquet writer. " 
+
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("page-size")
    +            .displayName("Page Size")
    +            .description("The page size used by the Parquet writer. " +
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("dictionary-page-size")
    +            .displayName("Dictionary Page Size")
    +            .description("The dictionary page size used by the Parquet 
writer. " +
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PADDING_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("max-padding-size")
    +            .displayName("Max Padding Size")
    +            .description("The maximum amount of padding that will be used 
to align row groups with blocks in the " +
    +                    "underlying filesystem. If the underlying filesystem 
is not a block filesystem like HDFS, this has no effect. " +
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = 
new PropertyDescriptor.Builder()
    +            .name("enable-dictionary-encoding")
    +            .displayName("Enable Dictionary Encoding")
    +            .description("Specifies whether dictionary encoding should be 
enabled for the Parquet writer")
    +            .allowableValues("true", "false")
    +            .build();
    +
    +    public static final PropertyDescriptor ENABLE_VALIDATION = new 
PropertyDescriptor.Builder()
    +            .name("enable-validation")
    +            .displayName("Enable Validation")
    +            .description("Specifies whether validation should be enabled 
for the Parquet writer")
    +            .allowableValues("true", "false")
    +            .build();
    +
    +    public static final PropertyDescriptor WRITER_VERSION = new 
PropertyDescriptor.Builder()
    +            .name("writer-version")
    +            .displayName("Writer Version")
    +            .description("Specifies the version used by Parquet writer")
    +            .allowableValues(ParquetProperties.WriterVersion.values())
    +            .build();
    +
    +    public static final PropertyDescriptor REMOVE_CRC_FILES = new 
PropertyDescriptor.Builder()
    +            .name("remove-crc-files")
    +            .displayName("Remove CRC Files")
    +            .description("Specifies whether the corresponding CRC file 
should be deleted upon successfully writing a Parquet file")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    private volatile List<PropertyDescriptor> parquetProps;
    +
    +    // Relationships
    +    static final Relationship SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Parquet file that was converted successfully 
from Avro")
    +            .build();
    +
    +    static final Relationship FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Avro content that could not be processed")
    +            .build();
    +
    +    static final Set<Relationship> RELATIONSHIPS
    +            = ImmutableSet.<Relationship>builder()
    +            .add(SUCCESS)
    +            .add(FAILURE)
    +            .build();
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext 
context) {
    +
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +
    +        props.add(COMPRESSION_TYPE);
    +        props.add(ROW_GROUP_SIZE);
    +        props.add(PAGE_SIZE);
    +        props.add(DICTIONARY_PAGE_SIZE);
    +        props.add(MAX_PADDING_SIZE);
    +        props.add(ENABLE_DICTIONARY_ENCODING);
    +        props.add(ENABLE_VALIDATION);
    +        props.add(WRITER_VERSION);
    +        props.add(REMOVE_CRC_FILES);
    +
    +        this.parquetProps = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return parquetProps;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        try {
    +
    +            long startTime = System.currentTimeMillis();
    +            final AtomicInteger totalRecordCount = new AtomicInteger(0);
    +
    +            final String fileName = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
    +
    +            FlowFile putFlowFile = flowFile;
    +
    +            putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
    +                try (final InputStream in = new BufferedInputStream(rawIn);
    +                     final OutputStream out = new 
BufferedOutputStream(rawOut);
    --- End diff --
    
    When calling createParquetWriter2 it is passing in rawOut so the 
BufferedOutputStream out is never used. We should probably be passing in out, 
or if not then we can remove this line.


> Processor ConvertAvroToParquet 
> -------------------------------
>
>                 Key: NIFI-5706
>                 URL: https://issues.apache.org/jira/browse/NIFI-5706
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.7.1
>            Reporter: Mohit
>            Priority: Major
>              Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to