[
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653905#comment-16653905
]
ASF GitHub Bot commented on NIFI-5706:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3079#discussion_r226017591
--- Diff:
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format.
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does
"
+ + "not contain any records, an empty parquet file is the output.
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives,
e.g.) can "
+ + "be converted to parquet, but unions of collections and other
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "Sets the
filename to the existing filename with the extension replaced by / added to by
.parquet"),
+ @WritesAttribute(attribute = "record.count", description = "Sets
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+ // Attributes
+ public static final String FILE_NAME_ATTRIBUTE = "file.name";
+ public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+ public static final List<AllowableValue> COMPRESSION_TYPES;
+ static {
+ final List<AllowableValue> compressionTypes = new ArrayList<>();
+ for (CompressionCodecName compressionCodecName :
CompressionCodecName.values()) {
+ final String name = compressionCodecName.name();
+ compressionTypes.add(new AllowableValue(name, name));
+ }
+ COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
+ }
+
+
+ // Parquet properties
+ public static final PropertyDescriptor COMPRESSION_TYPE = new
PropertyDescriptor.Builder()
+ .name("compression-type")
+ .displayName("Compression Type")
+ .description("The type of compression for the file being
written.")
+ .allowableValues(COMPRESSION_TYPES.toArray(new
AllowableValue[0]))
+ .defaultValue(COMPRESSION_TYPES.get(0).getValue())
+ .required(true)
+ .build();
+
+
+ public static final PropertyDescriptor ROW_GROUP_SIZE = new
PropertyDescriptor.Builder()
+ .name("row-group-size")
+ .displayName("Row Group Size")
+ .description("The row group size used by the Parquet writer. "
+
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("page-size")
+ .displayName("Page Size")
+ .description("The page size used by the Parquet writer. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("dictionary-page-size")
+ .displayName("Dictionary Page Size")
+ .description("The dictionary page size used by the Parquet
writer. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor MAX_PADDING_SIZE = new
PropertyDescriptor.Builder()
+ .name("max-padding-size")
+ .displayName("Max Padding Size")
+ .description("The maximum amount of padding that will be used
to align row groups with blocks in the " +
+ "underlying filesystem. If the underlying filesystem
is not a block filesystem like HDFS, this has no effect. " +
+ "The value is specified in the format of <Data Size>
<Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING =
new PropertyDescriptor.Builder()
+ .name("enable-dictionary-encoding")
+ .displayName("Enable Dictionary Encoding")
+ .description("Specifies whether dictionary encoding should be
enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor ENABLE_VALIDATION = new
PropertyDescriptor.Builder()
+ .name("enable-validation")
+ .displayName("Enable Validation")
+ .description("Specifies whether validation should be enabled
for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor WRITER_VERSION = new
PropertyDescriptor.Builder()
+ .name("writer-version")
+ .displayName("Writer Version")
+ .description("Specifies the version used by Parquet writer")
+ .allowableValues(ParquetProperties.WriterVersion.values())
+ .build();
+
+ public static final PropertyDescriptor REMOVE_CRC_FILES = new
PropertyDescriptor.Builder()
--- End diff --
This seems to be unused in this processor
> Processor ConvertAvroToParquet
> -------------------------------
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.7.1
> Reporter: Mohit
> Priority: Major
> Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS.
> PutParquet bypasses the _flowfile_ implementation and writes the file
> directly to sink.
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)