Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3079#discussion_r226021329
--- Diff:
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format.
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does
"
+ + "not contain any records, an empty parquet file is the output.
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives,
e.g.) can "
+ + "be converted to parquet, but unions of collections and other
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "Sets the
filename to the existing filename with the extension replaced by / added to by
.parquet"),
+ @WritesAttribute(attribute = "record.count", description = "Sets
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+ // Attributes
+ public static final String FILE_NAME_ATTRIBUTE = "file.name";
+ public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+ public static final List<AllowableValue> COMPRESSION_TYPES;
+ static {
+ final List<AllowableValue> compressionTypes = new ArrayList<>();
+ for (CompressionCodecName compressionCodecName :
CompressionCodecName.values()) {
+ final String name = compressionCodecName.name();
+ compressionTypes.add(new AllowableValue(name, name));
+ }
+ COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
+ }
+
+
+ // Parquet properties
+ public static final PropertyDescriptor COMPRESSION_TYPE = new
PropertyDescriptor.Builder()
+ .name("compression-type")
+ .displayName("Compression Type")
+ .description("The type of compression for the file being
written.")
+ .allowableValues(COMPRESSION_TYPES.toArray(new
AllowableValue[0]))
+ .defaultValue(COMPRESSION_TYPES.get(0).getValue())
+ .required(true)
+ .build();
+
+
+ public static final PropertyDescriptor ROW_GROUP_SIZE = new
PropertyDescriptor.Builder()
+ .name("row-group-size")
+ .displayName("Row Group Size")
+ .description("The row group size used by the Parquet writer. "
+
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("page-size")
+ .displayName("Page Size")
+ .description("The page size used by the Parquet writer. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("dictionary-page-size")
+ .displayName("Dictionary Page Size")
+ .description("The dictionary page size used by the Parquet
writer. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor MAX_PADDING_SIZE = new
PropertyDescriptor.Builder()
+ .name("max-padding-size")
+ .displayName("Max Padding Size")
+ .description("The maximum amount of padding that will be used
to align row groups with blocks in the " +
+ "underlying filesystem. If the underlying filesystem
is not a block filesystem like HDFS, this has no effect. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING =
new PropertyDescriptor.Builder()
+ .name("enable-dictionary-encoding")
+ .displayName("Enable Dictionary Encoding")
+ .description("Specifies whether dictionary encoding should be
enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor ENABLE_VALIDATION = new
PropertyDescriptor.Builder()
+ .name("enable-validation")
+ .displayName("Enable Validation")
+ .description("Specifies whether validation should be enabled
for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor WRITER_VERSION = new
PropertyDescriptor.Builder()
+ .name("writer-version")
+ .displayName("Writer Version")
+ .description("Specifies the version used by Parquet writer")
+ .allowableValues(ParquetProperties.WriterVersion.values())
+ .build();
+
+ public static final PropertyDescriptor REMOVE_CRC_FILES = new
PropertyDescriptor.Builder()
+ .name("remove-crc-files")
+ .displayName("Remove CRC Files")
+ .description("Specifies whether the corresponding CRC file
should be deleted upon successfully writing a Parquet file")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ private volatile List<PropertyDescriptor> parquetProps;
+
+ // Relationships
+ static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Parquet file that was converted successfully
from Avro")
+ .build();
+
+ static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Avro content that could not be processed")
+ .build();
+
+ static final Set<Relationship> RELATIONSHIPS
+ = ImmutableSet.<Relationship>builder()
+ .add(SUCCESS)
+ .add(FAILURE)
+ .build();
+
+ @Override
+ protected final void init(final ProcessorInitializationContext
context) {
+
+
+ final List<PropertyDescriptor> props = new ArrayList<>();
+
+ props.add(COMPRESSION_TYPE);
+ props.add(ROW_GROUP_SIZE);
+ props.add(PAGE_SIZE);
+ props.add(DICTIONARY_PAGE_SIZE);
+ props.add(MAX_PADDING_SIZE);
+ props.add(ENABLE_DICTIONARY_ENCODING);
+ props.add(ENABLE_VALIDATION);
+ props.add(WRITER_VERSION);
+ props.add(REMOVE_CRC_FILES);
+
+ this.parquetProps = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return parquetProps;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+
+ long startTime = System.currentTimeMillis();
+ final AtomicInteger totalRecordCount = new AtomicInteger(0);
+
+ final String fileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ FlowFile putFlowFile = flowFile;
+
+ putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
+ try (final InputStream in = new BufferedInputStream(rawIn);
+ final OutputStream out = new
BufferedOutputStream(rawOut);
+ final DataFileStream<GenericRecord> dataFileReader =
new DataFileStream<>(in, new GenericDatumReader<>())) {
+
+ Schema avroSchema = dataFileReader.getSchema();
+ System.out.println(avroSchema.toString(true));
+ ParquetWriter<GenericRecord> writer =
createParquetWriter2(context, flowFile, rawOut, avroSchema );
+
+ try {
+ int recordCount = 0;
+ GenericRecord record = null;
+ while (dataFileReader.hasNext()) {
+ record = dataFileReader.next();
+ writer.write(record);
+ recordCount++;
+ }
+ totalRecordCount.set(recordCount);
+ } finally {
+ writer.close();
+ }
+ }
+ });
+
+
+ // Add attributes and transfer to success
+ putFlowFile = session.putAttribute(putFlowFile,
RECORD_COUNT_ATTRIBUTE, Integer.toString(totalRecordCount.get()));
+ StringBuilder newFilename = new StringBuilder();
+ int extensionIndex = fileName.lastIndexOf(".");
+ if (extensionIndex != -1) {
+ newFilename.append(fileName.substring(0, extensionIndex));
+ } else {
+ newFilename.append(fileName);
+ }
+ newFilename.append(".parquet");
+ putFlowFile = session.putAttribute(putFlowFile,
CoreAttributes.FILENAME.key(), newFilename.toString());
--- End diff --
This is a minor point, but it would be more efficient to create a
Map<String,String> with the record count and filename, and then make one call
to putAttributes
---