[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653904#comment-16653904
 ] 

ASF GitHub Bot commented on NIFI-5706:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3079#discussion_r226018942
  
    --- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.parquet;
    +
    +
    +import com.google.common.collect.ImmutableSet;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
    +import org.apache.parquet.avro.AvroParquetWriter;
    +import org.apache.parquet.avro.AvroReadSupport;
    +import org.apache.parquet.column.ParquetProperties;
    +import org.apache.parquet.hadoop.ParquetFileWriter;
    +import org.apache.parquet.hadoop.ParquetWriter;
    +import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +@Tags({"avro", "parquet", "convert"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
    +        + "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
    +        + "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
    +        @WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
    +})
    +public class ConvertAvroToParquet extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String FILE_NAME_ATTRIBUTE = "file.name";
    +    public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
    +
    +    public static final List<AllowableValue> COMPRESSION_TYPES;
    +    static {
    +        final List<AllowableValue> compressionTypes = new ArrayList<>();
    +        for (CompressionCodecName compressionCodecName : 
CompressionCodecName.values()) {
    +            final String name = compressionCodecName.name();
    +            compressionTypes.add(new AllowableValue(name, name));
    +        }
    +        COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
    +    }
    +
    +
    +    // Parquet properties
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being 
written.")
    +            .allowableValues(COMPRESSION_TYPES.toArray(new 
AllowableValue[0]))
    +            .defaultValue(COMPRESSION_TYPES.get(0).getValue())
    +            .required(true)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor ROW_GROUP_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("row-group-size")
    +            .displayName("Row Group Size")
    +            .description("The row group size used by the Parquet writer. " 
+
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("page-size")
    +            .displayName("Page Size")
    +            .description("The page size used by the Parquet writer. " +
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("dictionary-page-size")
    +            .displayName("Dictionary Page Size")
    +            .description("The dictionary page size used by the Parquet 
writer. " +
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_PADDING_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("max-padding-size")
    +            .displayName("Max Padding Size")
    +            .description("The maximum amount of padding that will be used 
to align row groups with blocks in the " +
    +                    "underlying filesystem. If the underlying filesystem 
is not a block filesystem like HDFS, this has no effect. " +
    +                    "The value is specified in the format of <Data Size> 
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = 
new PropertyDescriptor.Builder()
    +            .name("enable-dictionary-encoding")
    +            .displayName("Enable Dictionary Encoding")
    +            .description("Specifies whether dictionary encoding should be 
enabled for the Parquet writer")
    +            .allowableValues("true", "false")
    +            .build();
    +
    +    public static final PropertyDescriptor ENABLE_VALIDATION = new 
PropertyDescriptor.Builder()
    +            .name("enable-validation")
    +            .displayName("Enable Validation")
    +            .description("Specifies whether validation should be enabled 
for the Parquet writer")
    +            .allowableValues("true", "false")
    +            .build();
    +
    +    public static final PropertyDescriptor WRITER_VERSION = new 
PropertyDescriptor.Builder()
    +            .name("writer-version")
    +            .displayName("Writer Version")
    +            .description("Specifies the version used by Parquet writer")
    +            .allowableValues(ParquetProperties.WriterVersion.values())
    +            .build();
    +
    +    public static final PropertyDescriptor REMOVE_CRC_FILES = new 
PropertyDescriptor.Builder()
    +            .name("remove-crc-files")
    +            .displayName("Remove CRC Files")
    +            .description("Specifies whether the corresponding CRC file 
should be deleted upon successfully writing a Parquet file")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    private volatile List<PropertyDescriptor> parquetProps;
    +
    +    // Relationships
    +    static final Relationship SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("Parquet file that was converted successfully 
from Avro")
    +            .build();
    +
    +    static final Relationship FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("Avro content that could not be processed")
    +            .build();
    +
    +    static final Set<Relationship> RELATIONSHIPS
    +            = ImmutableSet.<Relationship>builder()
    +            .add(SUCCESS)
    +            .add(FAILURE)
    +            .build();
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext 
context) {
    +
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +
    +        props.add(COMPRESSION_TYPE);
    +        props.add(ROW_GROUP_SIZE);
    +        props.add(PAGE_SIZE);
    +        props.add(DICTIONARY_PAGE_SIZE);
    +        props.add(MAX_PADDING_SIZE);
    +        props.add(ENABLE_DICTIONARY_ENCODING);
    +        props.add(ENABLE_VALIDATION);
    +        props.add(WRITER_VERSION);
    +        props.add(REMOVE_CRC_FILES);
    +
    +        this.parquetProps = Collections.unmodifiableList(props);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return parquetProps;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        try {
    +
    +            long startTime = System.currentTimeMillis();
    +            final AtomicInteger totalRecordCount = new AtomicInteger(0);
    +
    +            final String fileName = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
    +
    +            FlowFile putFlowFile = flowFile;
    +
    +            putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
    +                try (final InputStream in = new BufferedInputStream(rawIn);
    +                     final OutputStream out = new 
BufferedOutputStream(rawOut);
    +                     final DataFileStream<GenericRecord> dataFileReader = 
new DataFileStream<>(in, new GenericDatumReader<>())) {
    +
    +                    Schema avroSchema = dataFileReader.getSchema();
    +                    System.out.println(avroSchema.toString(true));
    --- End diff --
    
    Should be removed, or replaced with logger.debug


> Processor ConvertAvroToParquet 
> -------------------------------
>
>                 Key: NIFI-5706
>                 URL: https://issues.apache.org/jira/browse/NIFI-5706
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.7.1
>            Reporter: Mohit
>            Priority: Major
>              Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to