[ https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653903#comment-16653903 ]
ASF GitHub Bot commented on NIFI-5706: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/3079#discussion_r226017678 --- Diff: nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java --- @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.parquet; + + +import com.google.common.collect.ImmutableSet; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"avro", "parquet", "convert"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does " + + "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can " + + "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"), + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.") +}) +public class ConvertAvroToParquet extends AbstractProcessor { + + // Attributes + public static final String FILE_NAME_ATTRIBUTE = "file.name"; --- End diff -- This appears to be unused. > Processor ConvertAvroToParquet > ------------------------------- > > Key: NIFI-5706 > URL: https://issues.apache.org/jira/browse/NIFI-5706 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.7.1 > Reporter: Mohit > Priority: Major > Labels: pull-request-available > > *Why*? > PutParquet support is limited to HDFS. > PutParquet bypasses the _flowfile_ implementation and writes the file > directly to sink. > We need a processor for parquet that works like _ConvertAvroToOrc_. > *What*? > _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet > flowfile. Unlike PutParquet, which writes to the hdfs file system, processor > ConvertAvroToParquet would write into the flowfile, which can be pipelined to > put into other sinks, like _local_, _S3, Azure data lake_ etc. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)