http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java new file mode 100644 index 0000000..934eb59 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"put", "parquet", "hadoop", "HDFS", "filesystem", "restricted"}) +@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " + + "to a Parquet file. The schema for the Parquet file must be provided in the processor properties. This processor will " + + "first write a temporary dot file and upon successfully writing every record to the dot file, it will rename the " + + "dot file to it's final name. If the dot file cannot be renamed, the rename operation will be attempted up to 10 times, and " + + "if still not successful, the dot file will be deleted and the flow file will be routed to failure. " + + " If any error occurs while reading records from the input, or writing records to the output, " + + "the entire dot file will be removed and the flow file will be routed to failure or retry, depending on the error.") +@ReadsAttribute(attribute = "filename", description = "The name of the file to write comes from the value of this attribute.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file is stored in this attribute."), + @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."), + @WritesAttribute(attribute = "record.count", description = "The number of records written to the Parquet file") +}) +@Restricted("Provides operator the ability to write to any file that NiFi has access to in HDFS or the local filesystem.") +public class PutParquet extends AbstractPutHDFSRecord { + + public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder() + .name("row-group-size") + .displayName("Row Group Size") + .description("The row group size used by the Parquet writer. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("The page size used by the Parquet writer. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder() + .name("dictionary-page-size") + .displayName("Dictionary Page Size") + .description("The dictionary page size used by the Parquet writer. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder() + .name("max-padding-size") + .displayName("Max Padding Size") + .description("The maximum amount of padding that will be used to align row groups with blocks in the " + + "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " + + "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder() + .name("enable-dictionary-encoding") + .displayName("Enable Dictionary Encoding") + .description("Specifies whether dictionary encoding should be enabled for the Parquet writer") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder() + .name("enable-validation") + .displayName("Enable Validation") + .description("Specifies whether validation should be enabled for the Parquet writer") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder() + .name("writer-version") + .displayName("Writer Version") + .description("Specifies the version used by Parquet writer") + .allowableValues(ParquetProperties.WriterVersion.values()) + .build(); + + public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder() + .name("remove-crc-files") + .displayName("Remove CRC Files") + .description("Specifies whether the corresponding CRC file should be deleted upon successfully writing a Parquet file") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final List<AllowableValue> COMPRESSION_TYPES; + static { + final List<AllowableValue> compressionTypes = new ArrayList<>(); + for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) { + final String name = compressionCodecName.name(); + compressionTypes.add(new AllowableValue(name, name)); + } + COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes); + } + + @Override + public List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context) { + return COMPRESSION_TYPES; + } + + @Override + public String getDefaultCompressionType(final ProcessorInitializationContext context) { + return CompressionCodecName.UNCOMPRESSED.name(); + } + + @Override + public List<PropertyDescriptor> getAdditionalProperties() { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(ROW_GROUP_SIZE); + props.add(PAGE_SIZE); + props.add(DICTIONARY_PAGE_SIZE); + props.add(MAX_PADDING_SIZE); + props.add(ENABLE_DICTIONARY_ENCODING); + props.add(ENABLE_VALIDATION); + props.add(WRITER_VERSION); + props.add(REMOVE_CRC_FILES); + return Collections.unmodifiableList(props); + } + + @Override + public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path, final RecordSchema schema) + throws IOException, SchemaNotFoundException { + + final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); + + final AvroParquetWriter.Builder<GenericRecord> parquetWriter = AvroParquetWriter + .<GenericRecord>builder(path) + .withSchema(avroSchema); + + applyCommonConfig(parquetWriter, context, flowFile, conf); + + return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema); + } + + private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) { + builder.withConf(conf); + + // Required properties + + final boolean overwrite = context.getProperty(OVERWRITE).asBoolean(); + final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; + builder.withWriteMode(mode); + + final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName()); + final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue(); + + final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue); + builder.withCompressionCodec(codecName); + + // Optional properties + + if (context.getProperty(ROW_GROUP_SIZE).isSet()){ + try { + final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (rowGroupSize != null) { + builder.withRowGroupSize(rowGroupSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(PAGE_SIZE).isSet()) { + try { + final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (pageSize != null) { + builder.withPageSize(pageSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) { + try { + final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (dictionaryPageSize != null) { + builder.withDictionaryPageSize(dictionaryPageSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(MAX_PADDING_SIZE).isSet()) { + try { + final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (maxPaddingSize != null) { + builder.withMaxPaddingSize(maxPaddingSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) { + final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean(); + builder.withDictionaryEncoding(enableDictionaryEncoding); + } + + if (context.getProperty(ENABLE_VALIDATION).isSet()) { + final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean(); + builder.withValidation(enableValidation); + } + + if (context.getProperty(WRITER_VERSION).isSet()) { + final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue(); + builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue)); + } + } + + @Override + protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) { + final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean(); + if (removeCRCFiles) { + final String filename = destFile.getName(); + final String hdfsPath = destFile.getParent().toString(); + + final Path crcFile = new Path(hdfsPath, "." + filename + ".crc"); + deleteQuietly(getFileSystem(), crcFile); + } + + return flowFile; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java new file mode 100644 index 0000000..8421e37 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet.record; + +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.parquet.hadoop.ParquetReader; + +import java.io.IOException; +import java.util.Map; + +/** + * HDFSRecordReader that reads Parquet files using Avro. + */ +public class AvroParquetHDFSRecordReader implements HDFSRecordReader { + + private GenericRecord lastRecord; + private RecordSchema recordSchema; + private boolean initialized = false; + + private final ParquetReader<GenericRecord> parquetReader; + + public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader) { + this.parquetReader = parquetReader; + } + + @Override + public Record nextRecord() throws IOException { + if (initialized && lastRecord == null) { + return null; + } + + lastRecord = parquetReader.read(); + initialized = true; + + if (lastRecord == null) { + return null; + } + + if (recordSchema == null) { + recordSchema = AvroTypeUtil.createSchema(lastRecord.getSchema()); + } + + final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(lastRecord, recordSchema); + return new MapRecord(recordSchema, values); + } + + + @Override + public void close() throws IOException { + parquetReader.close(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java new file mode 100644 index 0000000..7ef37b1 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet.record; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.parquet.hadoop.ParquetWriter; + +import java.io.IOException; + +/** + * HDFSRecordWriter that writes Parquet files using Avro as the schema representation. + */ +public class AvroParquetHDFSRecordWriter implements HDFSRecordWriter { + + private final Schema avroSchema; + private final ParquetWriter<GenericRecord> parquetWriter; + + public AvroParquetHDFSRecordWriter(final ParquetWriter<GenericRecord> parquetWriter, final Schema avroSchema) { + this.avroSchema = avroSchema; + this.parquetWriter = parquetWriter; + } + + @Override + public void write(final Record record) throws IOException { + final GenericRecord genericRecord = AvroTypeUtil.createAvroRecord(record, avroSchema); + parquetWriter.write(genericRecord); + } + + @Override + public void close() throws IOException { + parquetWriter.close(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..36583e3 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.parquet.PutParquet +org.apache.nifi.processors.parquet.FetchParquet \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java new file mode 100644 index 0000000..a4f1513 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.when; + +public class FetchParquetTest { + + static final String DIRECTORY = "target"; + static final String TEST_CONF_PATH = "src/test/resources/core-site.xml"; + static final String RECORD_HEADER = "name,favorite_number,favorite_color"; + + private Schema schema; + private Configuration testConf; + private FetchParquet proc; + private TestRunner testRunner; + + @Before + public void setup() throws IOException, InitializationException { + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + + testConf = new Configuration(); + testConf.addResource(new Path(TEST_CONF_PATH)); + + proc = new FetchParquet(); + } + + private void configure(final FetchParquet fetchParquet) throws InitializationException { + testRunner = TestRunners.newTestRunner(fetchParquet); + testRunner.setProperty(FetchParquet.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + + final RecordSetWriterFactory writerFactory = new MockRecordWriter(RECORD_HEADER, false); + testRunner.addControllerService("mock-writer-factory", writerFactory); + testRunner.enableControllerService(writerFactory); + testRunner.setProperty(FetchParquet.RECORD_WRITER, "mock-writer-factory"); + } + + @Test + public void testFetchParquetToCSV() throws IOException, InitializationException { + configure(proc); + + final File parquetDir = new File(DIRECTORY); + final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet"); + final int numUsers = 10; + writeParquetUsers(parquetFile, numUsers); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath()); + attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName()); + + testRunner.enqueue("TRIGGER", attributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(numUsers)); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + // the mock record writer will write the header for each record so replace those to get down to just the records + String flowFileContent = new String(flowFile.toByteArray(), StandardCharsets.UTF_8); + flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", ""); + + verifyCSVRecords(numUsers, flowFileContent); + } + + @Test + public void testFetchWhenELEvaluatesToEmptyShouldRouteFailure() throws InitializationException { + configure(proc); + testRunner.setProperty(FetchParquet.FILENAME, "${missing.attr}"); + + testRunner.enqueue("TRIGGER"); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_FAILURE, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_FAILURE).get(0); + flowFile.assertAttributeEquals(FetchParquet.FETCH_FAILURE_REASON_ATTR, "Can not create a Path from an empty string"); + flowFile.assertContentEquals("TRIGGER"); + } + + @Test + public void testFetchWhenDoesntExistShouldRouteToFailure() throws InitializationException { + configure(proc); + + final String filename = "/tmp/does-not-exist-" + System.currentTimeMillis(); + testRunner.setProperty(FetchParquet.FILENAME, filename); + + testRunner.enqueue("TRIGGER"); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_FAILURE, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_FAILURE).get(0); + flowFile.assertAttributeEquals(FetchParquet.FETCH_FAILURE_REASON_ATTR, "File " + filename + " does not exist"); + flowFile.assertContentEquals("TRIGGER"); + } + + @Test + public void testIOExceptionCreatingReaderShouldRouteToRetry() throws InitializationException, IOException { + final FetchParquet proc = new FetchParquet() { + @Override + public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path) + throws IOException { + throw new IOException("IOException"); + } + }; + + configure(proc); + + final File parquetDir = new File(DIRECTORY); + final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet"); + final int numUsers = 10; + writeParquetUsers(parquetFile, numUsers); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath()); + attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName()); + + testRunner.enqueue("TRIGGER", attributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0); + flowFile.assertContentEquals("TRIGGER"); + } + + @Test + public void testIOExceptionWhileReadingShouldRouteToRetry() throws IOException, InitializationException { + final FetchParquet proc = new FetchParquet() { + @Override + public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path) + throws IOException { + return new HDFSRecordReader() { + @Override + public Record nextRecord() throws IOException { + throw new IOException("IOException"); + } + @Override + public void close() throws IOException { + } + }; + } + }; + + configure(proc); + + final File parquetDir = new File(DIRECTORY); + final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet"); + final int numUsers = 10; + writeParquetUsers(parquetFile, numUsers); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath()); + attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName()); + + testRunner.enqueue("TRIGGER", attributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0); + flowFile.assertContentEquals("TRIGGER"); + } + + @Test + public void testIOExceptionWhileWritingShouldRouteToRetry() throws InitializationException, IOException, SchemaNotFoundException { + configure(proc); + + final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class); + when(recordSetWriter.write(any(RecordSet.class), any(OutputStream.class))).thenThrow(new IOException("IOException")); + + final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); + when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); + when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(FlowFile.class), any(InputStream.class))).thenReturn(recordSetWriter); + + testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory); + testRunner.enableControllerService(recordSetWriterFactory); + testRunner.setProperty(FetchParquet.RECORD_WRITER, "mock-writer-factory"); + + final File parquetDir = new File(DIRECTORY); + final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet"); + final int numUsers = 10; + writeParquetUsers(parquetFile, numUsers); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath()); + attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName()); + + testRunner.enqueue("TRIGGER", attributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1); + + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0); + flowFile.assertContentEquals("TRIGGER"); + } + + protected void verifyCSVRecords(int numUsers, String csvContent) { + final String[] splits = csvContent.split("[\\n]"); + Assert.assertEquals(numUsers, splits.length); + + for (int i=0; i < numUsers; i++) { + final String line = splits[i]; + Assert.assertEquals("Bob" + i + "," + i + ",blue" + i, line); + } + } + + private void writeParquetUsers(final File parquetFile, int numUsers) throws IOException { + if (parquetFile.exists()) { + Assert.assertTrue(parquetFile.delete()); + } + + final Path parquetPath = new Path(parquetFile.getPath()); + + final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter + .<GenericRecord>builder(parquetPath) + .withSchema(schema) + .withConf(testConf); + + try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) { + for (int i=0; i < numUsers; i++) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "Bob" + i); + user.put("favorite_number", i); + user.put("favorite_color", "blue" + i); + + writer.write(user); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java new file mode 100644 index 0000000..3a07dce --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java @@ -0,0 +1,669 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.hadoop.exception.FailureException; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.when; + + +public class PutParquetTest { + + static final String DIRECTORY = "target"; + static final String TEST_CONF_PATH = "src/test/resources/core-site.xml"; + + private Schema schema; + private Configuration testConf; + private PutParquet proc; + private MockRecordParser readerFactory; + private TestRunner testRunner; + + + @Before + public void setup() throws IOException, InitializationException { + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + + testConf = new Configuration(); + testConf.addResource(new Path(TEST_CONF_PATH)); + + proc = new PutParquet(); + } + + private void configure(final PutParquet putParquet, final int numUsers) throws InitializationException { + testRunner = TestRunners.newTestRunner(putParquet); + testRunner.setProperty(PutParquet.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + testRunner.setProperty(PutParquet.DIRECTORY, DIRECTORY); + + readerFactory = new MockRecordParser(); + + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType()); + } + + for (int i=0; i < numUsers; i++) { + readerFactory.addRecord("name" + i, i, "blue" + i); + } + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + + testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory"); + testRunner.setProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); + testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, schema.toString()); + } + + @Test + public void testWriteAvroParquetWithDefaults() throws IOException, InitializationException { + configure(proc, 100); + + final String filename = "testWriteAvroWithDefaults-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + + final Path avroParquetFile = new Path(DIRECTORY + "/" + filename); + + // verify the successful flow file has the expected attributes + final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutParquet.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(PutParquet.ABSOLUTE_HDFS_PATH_ATTRIBUTE, avroParquetFile.getParent().toString()); + mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); + mockFlowFile.assertAttributeEquals(PutParquet.RECORD_COUNT_ATTR, "100"); + + // verify we generated a provenance event + final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); + Assert.assertEquals(1, provEvents.size()); + + // verify it was a SEND event with the correct URI + final ProvenanceEventRecord provEvent = provEvents.get(0); + Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); + Assert.assertEquals("hdfs://" + avroParquetFile.toString(), provEvent.getTransitUri()); + + // verify the content of the parquet file by reading it back in + verifyAvroParquetUsers(avroParquetFile, 100); + + // verify we don't have the temp dot file after success + final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempAvroParquetFile.exists()); + + // verify we DO have the CRC file after success + final File crcAvroParquetFile = new File(DIRECTORY + "/." + filename + ".crc"); + Assert.assertTrue(crcAvroParquetFile.exists()); + } + + @Test + public void testWriteAvroAndRemoveCRCFiles() throws IOException, InitializationException { + configure(proc,100); + testRunner.setProperty(PutParquet.REMOVE_CRC_FILES, "true"); + + final String filename = "testWriteAvroAndRemoveCRCFiles-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + + // verify we don't have the temp dot file after success + final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempAvroParquetFile.exists()); + + // verify we don't have the CRC file after success because we set remove to true + final File crcAvroParquetFile = new File(DIRECTORY + "/." + filename + ".crc"); + Assert.assertFalse(crcAvroParquetFile.exists()); + } + + @Test + public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException { + configure(proc, 100); + testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name()); + + final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + + // verify the content of the parquet file by reading it back in + final Path avroParquetFile = new Path(DIRECTORY + "/" + filename); + verifyAvroParquetUsers(avroParquetFile, 100); + } + + @Test + public void testInvalidAvroShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException { + configure(proc, 0); + + // simulate throwing an IOException when the factory creates a reader which is what would happen when + // invalid Avro is passed to the Avro reader factory + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenThrow(new IOException("NOT AVRO")); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory"); + + final String filename = "testInvalidAvroShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testCreateDirectoryIOExceptionShouldRouteToRetry() throws InitializationException, IOException { + final PutParquet proc = new PutParquet() { + @Override + protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup) + throws IOException, FailureException { + throw new IOException("IOException creating directory"); + } + }; + + configure(proc, 10); + + final String filename = "testCreateDirectoryIOExceptionShouldRouteToRetry-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1); + } + + @Test + public void testCreateDirectoryFailureExceptionShouldRouteToFailure() throws InitializationException, IOException { + final PutParquet proc = new PutParquet() { + @Override + protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup) + throws IOException, FailureException { + throw new FailureException("FailureException creating directory"); + } + }; + + configure(proc, 10); + + final String filename = "testCreateDirectoryFailureExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testDestinationExistsWithoutOverwriteShouldRouteFailure() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.OVERWRITE, "false"); + + final String filename = "testDestinationExistsWithoutOverwriteShouldRouteFailure-" + System.currentTimeMillis(); + + // create a file in the directory with the same name + final File avroParquetFile = new File(DIRECTORY + "/" + filename); + Assert.assertTrue(avroParquetFile.createNewFile()); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testTempDestinationExistsWithoutOverwriteShouldRouteFailure() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.OVERWRITE, "false"); + + // use the dot filename + final String filename = ".testDestinationExistsWithoutOverwriteShouldRouteFailure-" + System.currentTimeMillis(); + + // create a file in the directory with the same name + final File avroParquetFile = new File(DIRECTORY + "/" + filename); + Assert.assertTrue(avroParquetFile.createNewFile()); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testDestinationExistsWithOverwriteShouldBeSuccessful() throws InitializationException, IOException { + configure(proc, 10); + testRunner.setProperty(PutParquet.OVERWRITE, "true"); + + final String filename = "testDestinationExistsWithOverwriteShouldBeSuccessful-" + System.currentTimeMillis(); + + // create a file in the directory with the same name + final File avroParquetFile = new File(DIRECTORY + "/" + filename); + Assert.assertTrue(avroParquetFile.createNewFile()); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + } + + @Test + public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException { + configure(proc, 10); + testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}"); + + final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis(); + + // don't provide my.schema as an attribute + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("my.schema", schema.toString()); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + } + + @Test + public void testSchemaWithELMissingShouldRouteToFailure() throws InitializationException, IOException { + configure(proc, 10); + testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}"); + + final String filename = "testSchemaWithELMissingShouldRouteToFailure-" + System.currentTimeMillis(); + + // don't provide my.schema as an attribute + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testInvalidSchemaShouldRouteToFailure() throws InitializationException, IOException { + configure(proc, 10); + testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}"); + + final String filename = "testInvalidSchemaShouldRouteToFailure-" + System.currentTimeMillis(); + + // don't provide my.schema as an attribute + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("my.schema", "NOT A SCHEMA"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { + configure(proc, 10); + + final RecordReader recordReader = Mockito.mock(RecordReader.class); + when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR")); + + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory"); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testIOExceptionCreatingWriterShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException { + final PutParquet proc = new PutParquet() { + @Override + public HDFSRecordWriter createHDFSRecordWriter(ProcessContext context, FlowFile flowFile, Configuration conf, Path path, RecordSchema schema) + throws IOException, SchemaNotFoundException { + throw new IOException("IOException"); + } + }; + configure(proc, 0); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1); + } + + @Test + public void testIOExceptionFromReaderShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { + configure(proc, 10); + + final RecordSet recordSet = Mockito.mock(RecordSet.class); + when(recordSet.next()).thenThrow(new IOException("ERROR")); + + final RecordReader recordReader = Mockito.mock(RecordReader.class); + when(recordReader.createRecordSet()).thenReturn(recordSet); + + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory"); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1); + } + + @Test + public void testIOExceptionRenamingShouldRouteToRetry() throws InitializationException, IOException { + final PutParquet proc = new PutParquet() { + @Override + protected void rename(FileSystem fileSystem, Path srcFile, Path destFile) + throws IOException, InterruptedException, FailureException { + throw new IOException("IOException renaming"); + } + }; + + configure(proc, 10); + + final String filename = "testIOExceptionRenamingShouldRouteToRetry-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1); + + // verify we don't have the temp dot file after success + final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempAvroParquetFile.exists()); + } + + @Test + public void testFailureExceptionRenamingShouldRouteToFailure() throws InitializationException, IOException { + final PutParquet proc = new PutParquet() { + @Override + protected void rename(FileSystem fileSystem, Path srcFile, Path destFile) + throws IOException, InterruptedException, FailureException { + throw new FailureException("FailureException renaming"); + } + }; + + configure(proc, 10); + + final String filename = "testFailureExceptionRenamingShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + + // verify we don't have the temp dot file after success + final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempAvroParquetFile.exists()); + } + + @Test + public void testRowGroupSize() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B"); + + final String filename = "testRowGroupSize-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + } + + @Test + public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}"); + + final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("row.group.size", "NOT A DATA SIZE"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testPageSize() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B"); + + final String filename = "testPageGroupSize-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + } + + @Test + public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}"); + + final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("page.size", "NOT A DATA SIZE"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testDictionaryPageSize() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B"); + + final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + } + + @Test + public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}"); + + final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("dictionary.page.size", "NOT A DATA SIZE"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testMaxPaddingPageSize() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B"); + + final String filename = "testMaxPaddingSize-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + } + + @Test + public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException { + configure(proc, 10); + testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}"); + + final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("max.padding.size", "NOT A DATA SIZE"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); + } + + @Test + public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException { + configure(proc, 0); + + // add the favorite color as a string + readerFactory.addRecord("name0", "0", "blue0"); + + final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); + + final Path avroParquetFile = new Path(DIRECTORY + "/" + filename); + + // verify the content of the parquet file by reading it back in + verifyAvroParquetUsers(avroParquetFile, 1); + } + + private void verifyAvroParquetUsers(final Path avroParquetUsers, final int numExpectedUsers) throws IOException { + final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader + .<GenericRecord>builder(avroParquetUsers) + .withConf(testConf); + + int currUser = 0; + + try (final ParquetReader<GenericRecord> reader = readerBuilder.build()) { + GenericRecord nextRecord; + while((nextRecord = reader.read()) != null) { + Assert.assertNotNull(nextRecord); + Assert.assertEquals("name" + currUser, nextRecord.get("name").toString()); + Assert.assertEquals(currUser, nextRecord.get("favorite_number")); + Assert.assertEquals("blue" + currUser, nextRecord.get("favorite_color").toString()); + currUser++; + } + } + + Assert.assertEquals(numExpectedUsers, currUser); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc new file mode 100644 index 0000000..470827a --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc @@ -0,0 +1,9 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml new file mode 100644 index 0000000..234ecf2 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>fs.defaultFS</name> + <value>file:///</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/pom.xml new file mode 100644 index 0000000..2b8248c --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parquet-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-parquet-processors</module> + <module>nifi-parquet-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml index 3cfae30..a3a1c14 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml @@ -32,6 +32,10 @@ <artifactId>nifi-record-serialization-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml index fc01a18..af0f8c8 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml @@ -43,6 +43,10 @@ <artifactId>nifi-record-serialization-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 295ae96..d0a1f1c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -54,6 +54,10 @@ <artifactId>nifi-record-serialization-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> @@ -225,6 +229,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index 355f192..c31cdd4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -18,8 +18,8 @@ package org.apache.nifi.processors.standard import org.apache.nifi.processor.exception.ProcessException import org.apache.nifi.processor.util.pattern.RollbackOnFailure -import org.apache.nifi.processors.standard.util.record.MockRecordParser import org.apache.nifi.reporting.InitializationException +import org.apache.nifi.serialization.record.MockRecordParser import org.apache.nifi.serialization.record.RecordField import org.apache.nifi.serialization.record.RecordFieldType import org.apache.nifi.serialization.record.RecordSchema http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java index 0dcaeec..de3a428 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java @@ -17,17 +17,17 @@ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertTrue; - -import org.apache.nifi.processors.standard.util.record.MockRecordParser; -import org.apache.nifi.processors.standard.util.record.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import static org.junit.Assert.assertTrue; + public class TestConvertRecord { @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 32c3635..70d3e87 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -16,23 +16,15 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.standard.util.record.MockRecordParser; -import org.apache.nifi.processors.standard.util.record.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSet; @@ -42,6 +34,14 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + public class TestQueryRecord { static { http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java index 4c3bff4..e2f5005 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java @@ -17,20 +17,20 @@ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.apache.nifi.processors.standard.util.record.MockRecordParser; -import org.apache.nifi.processors.standard.util.record.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestSplitRecord { @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java deleted file mode 100644 index fcf0d10..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.standard.util.record; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; - -public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { - private final List<Object[]> records = new ArrayList<>(); - private final List<RecordField> fields = new ArrayList<>(); - private final int failAfterN; - - public MockRecordParser() { - this(-1); - } - - public MockRecordParser(final int failAfterN) { - this.failAfterN = failAfterN; - } - - - public void addSchemaField(final String fieldName, final RecordFieldType type) { - fields.add(new RecordField(fieldName, type.getDataType())); - } - - public void addRecord(Object... values) { - records.add(values); - } - - @Override - public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { - final Iterator<Object[]> itr = records.iterator(); - - return new RecordReader() { - private int recordCount = 0; - - @Override - public void close() throws IOException { - } - - @Override - public Record nextRecord() throws IOException, MalformedRecordException { - if (failAfterN >= recordCount) { - throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); - } - recordCount++; - - if (!itr.hasNext()) { - return null; - } - - final Object[] values = itr.next(); - final Map<String, Object> valueMap = new HashMap<>(); - int i = 0; - for (final RecordField field : fields) { - final String fieldName = field.getFieldName(); - valueMap.put(fieldName, values[i++]); - } - - return new MapRecord(new SimpleRecordSchema(fields), valueMap); - } - - @Override - public RecordSchema getSchema() { - return new SimpleRecordSchema(fields); - } - }; - } -} \ No newline at end of file
