[ https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653908#comment-16653908 ]
ASF GitHub Bot commented on NIFI-5706: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/3079#discussion_r226020300 --- Diff: nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java --- @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.parquet; + + +import com.google.common.collect.ImmutableSet; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"avro", "parquet", "convert"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does " + + "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can " + + "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"), + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.") +}) +public class ConvertAvroToParquet extends AbstractProcessor { + + // Attributes + public static final String FILE_NAME_ATTRIBUTE = "file.name"; + public static final String RECORD_COUNT_ATTRIBUTE = "record.count"; + + public static final List<AllowableValue> COMPRESSION_TYPES; + static { + final List<AllowableValue> compressionTypes = new ArrayList<>(); + for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) { + final String name = compressionCodecName.name(); + compressionTypes.add(new AllowableValue(name, name)); + } + COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes); + } + + + // Parquet properties + public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name("compression-type") + .displayName("Compression Type") + .description("The type of compression for the file being written.") + .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0])) + .defaultValue(COMPRESSION_TYPES.get(0).getValue()) + .required(true) + .build(); + + + public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder() + .name("row-group-size") + .displayName("Row Group Size") + .description("The row group size used by the Parquet writer. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("The page size used by the Parquet writer. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder() + .name("dictionary-page-size") + .displayName("Dictionary Page Size") + .description("The dictionary page size used by the Parquet writer. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder() + .name("max-padding-size") + .displayName("Max Padding Size") + .description("The maximum amount of padding that will be used to align row groups with blocks in the " + + "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder() + .name("enable-dictionary-encoding") + .displayName("Enable Dictionary Encoding") + .description("Specifies whether dictionary encoding should be enabled for the Parquet writer") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder() + .name("enable-validation") + .displayName("Enable Validation") + .description("Specifies whether validation should be enabled for the Parquet writer") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder() + .name("writer-version") + .displayName("Writer Version") + .description("Specifies the version used by Parquet writer") + .allowableValues(ParquetProperties.WriterVersion.values()) + .build(); + + public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder() + .name("remove-crc-files") + .displayName("Remove CRC Files") + .description("Specifies whether the corresponding CRC file should be deleted upon successfully writing a Parquet file") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + private volatile List<PropertyDescriptor> parquetProps; + + // Relationships + static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Parquet file that was converted successfully from Avro") + .build(); + + static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("Avro content that could not be processed") + .build(); + + static final Set<Relationship> RELATIONSHIPS + = ImmutableSet.<Relationship>builder() + .add(SUCCESS) + .add(FAILURE) + .build(); + + @Override + protected final void init(final ProcessorInitializationContext context) { + + + final List<PropertyDescriptor> props = new ArrayList<>(); + + props.add(COMPRESSION_TYPE); + props.add(ROW_GROUP_SIZE); + props.add(PAGE_SIZE); + props.add(DICTIONARY_PAGE_SIZE); + props.add(MAX_PADDING_SIZE); + props.add(ENABLE_DICTIONARY_ENCODING); + props.add(ENABLE_VALIDATION); + props.add(WRITER_VERSION); + props.add(REMOVE_CRC_FILES); + + this.parquetProps = Collections.unmodifiableList(props); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return parquetProps; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + + long startTime = System.currentTimeMillis(); + final AtomicInteger totalRecordCount = new AtomicInteger(0); + + final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + + FlowFile putFlowFile = flowFile; + + putFlowFile = session.write(flowFile, (rawIn, rawOut) -> { + try (final InputStream in = new BufferedInputStream(rawIn); + final OutputStream out = new BufferedOutputStream(rawOut); + final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, new GenericDatumReader<>())) { + + Schema avroSchema = dataFileReader.getSchema(); + System.out.println(avroSchema.toString(true)); + ParquetWriter<GenericRecord> writer = createParquetWriter2(context, flowFile, rawOut, avroSchema ); + + try { + int recordCount = 0; + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(); + writer.write(record); + recordCount++; + } + totalRecordCount.set(recordCount); + } finally { + writer.close(); + } + } + }); + + + // Add attributes and transfer to success + putFlowFile = session.putAttribute(putFlowFile, RECORD_COUNT_ATTRIBUTE, Integer.toString(totalRecordCount.get())); + StringBuilder newFilename = new StringBuilder(); + int extensionIndex = fileName.lastIndexOf("."); + if (extensionIndex != -1) { + newFilename.append(fileName.substring(0, extensionIndex)); + } else { + newFilename.append(fileName); + } + newFilename.append(".parquet"); + putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename.toString()); + session.transfer(putFlowFile, SUCCESS); + session.getProvenanceReporter().modifyContent(putFlowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime); + + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe}); + session.transfer(flowFile, FAILURE); + } + + } + + private ParquetWriter createParquetWriter2(final ProcessContext context, final FlowFile flowFile, final OutputStream out, final Schema schema) + throws IOException { + + NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out); + + final AvroParquetWriter.Builder<GenericRecord> parquetWriter = AvroParquetWriter + .<GenericRecord>builder(nifiParquetOutputFile) + .withSchema(schema); + + Configuration conf = new Configuration(); + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true); + conf.setBoolean("parquet.avro.add-list-element-records", false); + conf.setBoolean("parquet.avro.write-old-list-structure", false); + + applyCommonConfig(parquetWriter, context, flowFile, conf); + + return parquetWriter.build(); + } + + private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) { --- End diff -- Could we create a ParquetUtils class and move this method there so it can be shared between PutParquet and ConvertAvroToParquet? > Processor ConvertAvroToParquet > ------------------------------- > > Key: NIFI-5706 > URL: https://issues.apache.org/jira/browse/NIFI-5706 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.7.1 > Reporter: Mohit > Priority: Major > Labels: pull-request-available > > *Why*? > PutParquet support is limited to HDFS. > PutParquet bypasses the _flowfile_ implementation and writes the file > directly to sink. > We need a processor for parquet that works like _ConvertAvroToOrc_. > *What*? > _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet > flowfile. Unlike PutParquet, which writes to the hdfs file system, processor > ConvertAvroToParquet would write into the flowfile, which can be pipelined to > put into other sinks, like _local_, _S3, Azure data lake_ etc. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)