Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3079#discussion_r226019319
--- Diff:
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format.
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does
"
+ + "not contain any records, an empty parquet file is the output.
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives,
e.g.) can "
+ + "be converted to parquet, but unions of collections and other
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "Sets the
filename to the existing filename with the extension replaced by / added to by
.parquet"),
+ @WritesAttribute(attribute = "record.count", description = "Sets
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+ // Attributes
+ public static final String FILE_NAME_ATTRIBUTE = "file.name";
+ public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+ public static final List<AllowableValue> COMPRESSION_TYPES;
+ static {
+ final List<AllowableValue> compressionTypes = new ArrayList<>();
+ for (CompressionCodecName compressionCodecName :
CompressionCodecName.values()) {
+ final String name = compressionCodecName.name();
+ compressionTypes.add(new AllowableValue(name, name));
+ }
+ COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
+ }
+
+
+ // Parquet properties
+ public static final PropertyDescriptor COMPRESSION_TYPE = new
PropertyDescriptor.Builder()
+ .name("compression-type")
+ .displayName("Compression Type")
+ .description("The type of compression for the file being
written.")
+ .allowableValues(COMPRESSION_TYPES.toArray(new
AllowableValue[0]))
+ .defaultValue(COMPRESSION_TYPES.get(0).getValue())
+ .required(true)
+ .build();
+
+
+ public static final PropertyDescriptor ROW_GROUP_SIZE = new
PropertyDescriptor.Builder()
+ .name("row-group-size")
+ .displayName("Row Group Size")
+ .description("The row group size used by the Parquet writer. "
+
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("page-size")
+ .displayName("Page Size")
+ .description("The page size used by the Parquet writer. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("dictionary-page-size")
+ .displayName("Dictionary Page Size")
+ .description("The dictionary page size used by the Parquet
writer. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor MAX_PADDING_SIZE = new
PropertyDescriptor.Builder()
+ .name("max-padding-size")
+ .displayName("Max Padding Size")
+ .description("The maximum amount of padding that will be used
to align row groups with blocks in the " +
+ "underlying filesystem. If the underlying filesystem
is not a block filesystem like HDFS, this has no effect. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING =
new PropertyDescriptor.Builder()
+ .name("enable-dictionary-encoding")
+ .displayName("Enable Dictionary Encoding")
+ .description("Specifies whether dictionary encoding should be
enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor ENABLE_VALIDATION = new
PropertyDescriptor.Builder()
+ .name("enable-validation")
+ .displayName("Enable Validation")
+ .description("Specifies whether validation should be enabled
for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor WRITER_VERSION = new
PropertyDescriptor.Builder()
+ .name("writer-version")
+ .displayName("Writer Version")
+ .description("Specifies the version used by Parquet writer")
+ .allowableValues(ParquetProperties.WriterVersion.values())
+ .build();
+
+ public static final PropertyDescriptor REMOVE_CRC_FILES = new
PropertyDescriptor.Builder()
+ .name("remove-crc-files")
+ .displayName("Remove CRC Files")
+ .description("Specifies whether the corresponding CRC file
should be deleted upon successfully writing a Parquet file")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ private volatile List<PropertyDescriptor> parquetProps;
+
+ // Relationships
+ static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Parquet file that was converted successfully
from Avro")
+ .build();
+
+ static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Avro content that could not be processed")
+ .build();
+
+ static final Set<Relationship> RELATIONSHIPS
+ = ImmutableSet.<Relationship>builder()
+ .add(SUCCESS)
+ .add(FAILURE)
+ .build();
+
+ @Override
+ protected final void init(final ProcessorInitializationContext
context) {
+
+
+ final List<PropertyDescriptor> props = new ArrayList<>();
+
+ props.add(COMPRESSION_TYPE);
+ props.add(ROW_GROUP_SIZE);
+ props.add(PAGE_SIZE);
+ props.add(DICTIONARY_PAGE_SIZE);
+ props.add(MAX_PADDING_SIZE);
+ props.add(ENABLE_DICTIONARY_ENCODING);
+ props.add(ENABLE_VALIDATION);
+ props.add(WRITER_VERSION);
+ props.add(REMOVE_CRC_FILES);
+
+ this.parquetProps = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return parquetProps;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+
+ long startTime = System.currentTimeMillis();
+ final AtomicInteger totalRecordCount = new AtomicInteger(0);
+
+ final String fileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ FlowFile putFlowFile = flowFile;
+
+ putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
+ try (final InputStream in = new BufferedInputStream(rawIn);
+ final OutputStream out = new
BufferedOutputStream(rawOut);
--- End diff --
When calling createParquetWriter2 it is passing in rawOut so the
BufferedOutputStream out is never used. We should probably be passing in out,
or if not then we can remove this line.
---