NIFI-3921: Allow Record Writers to inherit schema from Record Signed-off-by: Matt Burgess <[email protected]>
This closes #1902 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e7dcb6f6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e7dcb6f6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e7dcb6f6 Branch: refs/heads/master Commit: e7dcb6f6c5665a2a2b8b96b39b76417f92e4f190 Parents: 4ed7511 Author: Mark Payne <[email protected]> Authored: Wed Jun 7 13:42:13 2017 -0400 Committer: Matt Burgess <[email protected]> Committed: Fri Jun 9 16:13:25 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/avro/AvroTypeUtil.java | 161 ++++++++++++++++--- .../schema/access/AvroSchemaTextStrategy.java | 2 +- .../nifi/schema/access/SchemaAccessUtils.java | 10 ++ .../WriteAvroSchemaAttributeStrategy.java | 52 ++++++ .../org/apache/nifi/avro/TestAvroTypeUtil.java | 103 ++++++++++++ .../hadoop/AbstractFetchHDFSRecord.java | 60 ++++--- .../hadoop/AbstractPutHDFSRecord.java | 95 ++--------- .../serialization/record/MockRecordWriter.java | 3 +- ...onworksAttributeSchemaReferenceStrategy.java | 2 +- ...rtonworksEncodedSchemaReferenceStrategy.java | 2 +- .../schema/access/InheritSchemaFromRecord.java | 44 +++++ .../schema/access/NopSchemaAccessWriter.java | 49 ++++++ .../schema/access/SchemaAccessStrategy.java | 3 +- .../access/SchemaNamePropertyStrategy.java | 2 +- .../schema/access/SchemaTextAsAttribute.java | 60 ------- .../processors/kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/PublishKafkaRecord_0_10.java | 21 +-- .../processors/kafka/pubsub/PublisherLease.java | 4 +- .../pubsub/TestPublishKafkaRecord_0_10.java | 18 +-- .../kafka/pubsub/util/MockRecordWriter.java | 3 +- .../nifi/processors/parquet/PutParquet.java | 2 +- .../processors/parquet/FetchParquetTest.java | 3 +- .../nifi/processors/parquet/PutParquetTest.java | 68 +++----- .../record/script/ScriptedRecordSetWriter.java | 22 +-- .../script/ScriptedRecordSetWriterTest.groovy | 4 +- .../groovy/test_record_writer_inline.groovy | 2 +- .../standard/AbstractRecordProcessor.java | 41 ++--- .../standard/AbstractRouteRecord.java | 13 +- .../processors/standard/PartitionRecord.java | 12 +- .../nifi/processors/standard/QueryRecord.java | 25 +-- .../nifi/processors/standard/SplitRecord.java | 12 +- .../processors/standard/TestQueryRecord.java | 19 ++- .../serialization/RecordSetWriterFactory.java | 5 +- .../java/org/apache/nifi/avro/AvroReader.java | 2 +- .../apache/nifi/avro/AvroRecordSetWriter.java | 2 +- .../avro/EmbeddedAvroSchemaAccessStrategy.java | 2 +- .../nifi/csv/CSVHeaderSchemaStrategy.java | 2 +- .../java/org/apache/nifi/csv/CSVReader.java | 2 +- .../java/org/apache/nifi/grok/GrokReader.java | 4 +- .../org/apache/nifi/json/JsonPathReader.java | 2 +- .../org/apache/nifi/json/JsonTreeReader.java | 2 +- .../SchemaRegistryRecordSetWriter.java | 31 +++- .../serialization/SchemaRegistryService.java | 19 ++- .../nifi/text/FreeFormTextRecordSetWriter.java | 9 +- .../avro/TestWriteAvroResultWithoutSchema.java | 4 +- .../nifi/csv/TestCSVHeaderSchemaStrategy.java | 2 +- 46 files changed, 605 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 76438c5..87139c6 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -17,7 +17,25 @@ package org.apache.nifi.avro; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.MathContext; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.avro.Conversions; +import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -29,8 +47,6 @@ import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; -import org.apache.avro.JsonProperties; -import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; @@ -39,28 +55,15 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.MathContext; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; - public class AvroTypeUtil { private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); public static final String AVRO_SCHEMA_FORMAT = "avro"; @@ -72,30 +75,142 @@ public class AvroTypeUtil { private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; private static final String LOGICAL_TYPE_DECIMAL = "decimal"; - public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { + public static Schema extractAvroSchema(final RecordSchema recordSchema) { if (recordSchema == null) { throw new IllegalArgumentException("RecordSchema cannot be null"); } final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat(); if (!schemaFormatOption.isPresent()) { - throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema"); + return buildAvroSchema(recordSchema); } final String schemaFormat = schemaFormatOption.get(); if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) { - throw new SchemaNotFoundException("Schema provided is not in Avro format"); + return buildAvroSchema(recordSchema); } final Optional<String> textOption = recordSchema.getSchemaText(); if (!textOption.isPresent()) { - throw new SchemaNotFoundException("No Schema text was present in the RecordSchema"); + return buildAvroSchema(recordSchema); } final String text = textOption.get(); return new Schema.Parser().parse(text); } + private static Schema buildAvroSchema(final RecordSchema recordSchema) { + final List<Field> avroFields = new ArrayList<>(recordSchema.getFieldCount()); + for (final RecordField recordField : recordSchema.getFields()) { + avroFields.add(buildAvroField(recordField)); + } + + final Schema avroSchema = Schema.createRecord("nifiRecord", null, "org.apache.nifi", false, avroFields); + return avroSchema; + } + + private static Field buildAvroField(final RecordField recordField) { + final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName()); + final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue()); + for (final String alias : recordField.getAliases()) { + field.addAlias(alias); + } + + return field; + } + + private static Schema buildAvroSchema(final DataType dataType, final String fieldName) { + final Schema schema; + + switch (dataType.getFieldType()) { + case ARRAY: + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + final DataType elementDataType = arrayDataType.getElementType(); + if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) { + schema = Schema.create(Type.BYTES); + } else { + final Schema elementType = buildAvroSchema(elementDataType, fieldName); + schema = Schema.createArray(elementType); + } + break; + case BIGINT: + schema = Schema.create(Type.STRING); + break; + case BOOLEAN: + schema = Schema.create(Type.BOOLEAN); + break; + case BYTE: + schema = Schema.create(Type.INT); + break; + case CHAR: + schema = Schema.create(Type.STRING); + break; + case CHOICE: + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final List<DataType> options = choiceDataType.getPossibleSubTypes(); + + final List<Schema> unionTypes = new ArrayList<>(options.size()); + for (final DataType option : options) { + unionTypes.add(buildAvroSchema(option, fieldName)); + } + + schema = Schema.createUnion(unionTypes); + break; + case DATE: + schema = Schema.create(Type.INT); + LogicalTypes.date().addToSchema(schema); + break; + case DOUBLE: + schema = Schema.create(Type.DOUBLE); + break; + case FLOAT: + schema = Schema.create(Type.FLOAT); + break; + case INT: + schema = Schema.create(Type.INT); + break; + case LONG: + schema = Schema.create(Type.LONG); + break; + case MAP: + schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName)); + break; + case RECORD: + final RecordDataType recordDataType = (RecordDataType) dataType; + final RecordSchema childSchema = recordDataType.getChildSchema(); + + final List<Field> childFields = new ArrayList<>(childSchema.getFieldCount()); + for (final RecordField field : childSchema.getFields()) { + childFields.add(buildAvroField(field)); + } + + schema = Schema.createRecord(fieldName + "Type", null, "org.apache.nifi", false, childFields); + break; + case SHORT: + schema = Schema.create(Type.INT); + break; + case STRING: + schema = Schema.create(Type.STRING); + break; + case TIME: + schema = Schema.create(Type.INT); + LogicalTypes.timeMillis().addToSchema(schema); + break; + case TIMESTAMP: + schema = Schema.create(Type.LONG); + LogicalTypes.timestampMillis().addToSchema(schema); + break; + default: + return null; + } + + return nullable(schema); + } + + private static Schema nullable(final Schema schema) { + return Schema.createUnion(Schema.create(Type.NULL), schema); + } + /** * Returns a DataType for the given Avro Schema * http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java index 3155909..5bf084e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java @@ -40,7 +40,7 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue(); if (schemaText == null || schemaText.trim().isEmpty()) { throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text"); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index cab9c02..b335b11 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -43,6 +43,10 @@ public class SchemaAccessUtils { + "found at https://github.com/hortonworks/registry"); public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes", "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'"); + public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema", + "The schema used to write records will be the same schema that was given to the Record when the Record was created."); + + public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() .name("schema-registry") @@ -117,6 +121,8 @@ public class SchemaAccessUtils { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); + } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { + return new InheritSchemaFromRecord(); } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { @@ -131,6 +137,8 @@ public class SchemaAccessUtils { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); + } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { + return new InheritSchemaFromRecord(); } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { @@ -145,6 +153,8 @@ public class SchemaAccessUtils { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); + } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { + return new InheritSchemaFromRecord(); } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java new file mode 100644 index 0000000..d9be673 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.serialization.record.RecordSchema; + +public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter { + + @Override + public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { + } + + @Override + public Map<String, String> getAttributes(final RecordSchema schema) { + final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); + final String schemaText = avroSchema.toString(); + return Collections.singletonMap("avro.schema", schemaText); + } + + @Override + public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { + } + + @Override + public Set<SchemaField> getRequiredSchemaFields() { + return EnumSet.noneOf(SchemaField.class); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java new file mode 100644 index 0000000..fe19733 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.Test; + +public class TestAvroTypeUtil { + + @Test + public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("int", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("long", RecordFieldType.LONG.getDataType())); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType(), "hola", Collections.singleton("greeting"))); + fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType())); + fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType())); + fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType())); + fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType())); + fields.add(new RecordField("time", RecordFieldType.TIME.getDataType())); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType())); + + final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); + fields.add(new RecordField("strings", arrayType)); + + final DataType mapType = RecordFieldType.MAP.getMapDataType(RecordFieldType.LONG.getDataType()); + fields.add(new RecordField("map", mapType)); + + + final List<RecordField> personFields = new ArrayList<>(); + personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + personFields.add(new RecordField("dob", RecordFieldType.DATE.getDataType())); + final RecordSchema personSchema = new SimpleRecordSchema(personFields); + final DataType personType = RecordFieldType.RECORD.getRecordDataType(personSchema); + fields.add(new RecordField("person", personType)); + + + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); + + // everything should be a union, since it's nullable. + for (final Field field : avroSchema.getFields()) { + final Schema fieldSchema = field.schema(); + assertEquals(Type.UNION, fieldSchema.getType()); + assertTrue("Field " + field.name() + " does not contain NULL type", fieldSchema.getTypes().contains(Schema.create(Type.NULL))); + } + + final RecordSchema afterConversion = AvroTypeUtil.createSchema(avroSchema); + + assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("int").get()); + assertEquals(RecordFieldType.LONG.getDataType(), afterConversion.getDataType("long").get()); + assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("string").get()); + assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get()); + assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get()); + assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get()); + assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get()); + assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get()); + assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get()); + assertEquals(RecordFieldType.DATE.getDataType(), afterConversion.getDataType("date").get()); + assertEquals(RecordFieldType.TIMESTAMP.getDataType(), afterConversion.getDataType("timestamp").get()); + assertEquals(arrayType, afterConversion.getDataType("strings").get()); + assertEquals(mapType, afterConversion.getDataType("map").get()); + assertEquals(personType, afterConversion.getDataType("person").get()); + + final RecordField stringField = afterConversion.getField("string").get(); + assertEquals("hola", stringField.getDefaultValue()); + assertEquals(Collections.singleton("greeting"), stringField.getAliases()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java index 7cc6bb5..fbbbbf4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java @@ -16,7 +16,22 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.commons.io.input.NullInputStream; +import java.io.BufferedOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,29 +52,11 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.StopWatch; -import java.io.BufferedOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - /** * Base processor for reading a data from HDFS that can be fetched into records. */ @@ -187,7 +184,6 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { final AtomicReference<WriteResult> writeResult = new AtomicReference<>(); final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0)); final StopWatch stopWatch = new StopWatch(true); @@ -200,22 +196,20 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) { - final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList()); + Record record = recordReader.nextRecord(); + final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, record == null ? null : record.getSchema()); - final RecordSet recordSet = new RecordSet() { - @Override - public RecordSchema getSchema() throws IOException { - return emptySchema; + try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) { + recordSetWriter.beginRecordSet(); + if (record != null) { + recordSetWriter.write(record); } - @Override - public Record next() throws IOException { - return recordReader.nextRecord(); + while ((record = recordReader.nextRecord()) != null) { + recordSetWriter.write(record); } - }; - try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) { - writeResult.set(recordSetWriter.write(recordSet)); + writeResult.set(recordSetWriter.finishRecordSet()); mimeTypeRef.set(recordSetWriter.getMimeType()); } } catch (Exception e) { @@ -247,7 +241,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { } catch (final FileNotFoundException | AccessControlException e) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e}); - final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage()); + final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage()); session.transfer(failureFlowFile, REL_FAILURE); } catch (final IOException | FlowFileAccessException e) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e}); @@ -255,7 +249,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { context.yield(); } catch (final Throwable t) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t}); - final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage()); + final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage() == null ? t.toString() : t.getMessage()); session.transfer(failureFlowFile, REL_FAILURE); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java index 6676ee6..70a3697 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -16,6 +16,21 @@ */ package org.apache.nifi.processors.hadoop; +import java.io.BufferedInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -28,8 +43,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -42,10 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.exception.FailureException; import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; -import org.apache.nifi.schema.access.SchemaAccessStrategy; -import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.WriteResult; @@ -53,32 +63,6 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.StopWatch; -import java.io.BufferedInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA; -import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES; -import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY; -import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME; -import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; -import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; -import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; -import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; - /** * Base class for processors that write Records to HDFS. */ @@ -156,18 +140,10 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { private volatile String remoteOwner; private volatile String remoteGroup; - private volatile SchemaAccessStrategy schemaAccessStrategy; private volatile Set<Relationship> putHdfsRecordRelationships; private volatile List<PropertyDescriptor> putHdfsRecordProperties; - private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList( - SCHEMA_NAME_PROPERTY, - SCHEMA_TEXT_PROPERTY, - HWX_SCHEMA_REF_ATTRIBUTES, - HWX_CONTENT_ENCODED_SCHEMA - )); - @Override protected final void init(final ProcessorInitializationContext context) { @@ -187,19 +163,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { .description("The parent directory to which files should be written. Will be created if it doesn't exist.") .build()); - final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]); - - props.add(new PropertyDescriptor.Builder() - .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY) - .description("Specifies how to obtain the schema that is to be used for writing the data.") - .allowableValues(strategies) - .defaultValue(getDefaultSchemaAccessStrategy().getValue()) - .build()); - - props.add(SCHEMA_REGISTRY); - props.add(SCHEMA_NAME); - props.add(SCHEMA_TEXT); - final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]); props.add(new PropertyDescriptor.Builder() @@ -216,18 +179,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { this.putHdfsRecordProperties = Collections.unmodifiableList(props); } - protected List<AllowableValue> getSchemaAccessStrategyValues() { - return strategyList; - } - - protected AllowableValue getDefaultSchemaAccessStrategy() { - return SCHEMA_NAME_PROPERTY; - } - - private PropertyDescriptor getSchemaAcessStrategyDescriptor() { - return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); - } - /** * @param context the initialization context * @return the possible compression types @@ -259,22 +210,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { return putHdfsRecordProperties; } - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); - return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues()); - } @OnScheduled public final void onScheduled(final ProcessContext context) throws IOException { super.abstractOnScheduled(context); - final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); - - final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); - final String schemaAccess = context.getProperty(descriptor).getValue(); - this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context); - this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); @@ -365,8 +305,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { HDFSRecordWriter recordWriter = null; try (final BufferedInputStream in = new BufferedInputStream(rawIn)) { - final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in); - recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema); // if we fail to create the RecordReader then we want to route to failure, so we need to // handle this separately from the other IOExceptions which normally route to retry @@ -379,8 +317,9 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { } final RecordSet recordSet = recordReader.createRecordSet(); - writeResult.set(recordWriter.write(recordSet)); + recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema()); + writeResult.set(recordWriter.write(recordSet)); } catch (Exception e) { exceptionHolder.set(e); } finally { http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index 891bbe3..9bde647 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -18,7 +18,6 @@ package org.apache.nifi.serialization.record; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; @@ -55,7 +54,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream content) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema schema) throws SchemaNotFoundException, IOException { return new SimpleRecordSchema(Collections.emptyList()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java index d1f0e8a..073a453 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java @@ -47,7 +47,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE); final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE); final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java index de89900..b2e5a48 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java @@ -45,7 +45,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final byte[] buffer = new byte[13]; try { StreamUtils.fillBuffer(contentStream, buffer); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java new file mode 100644 index 0000000..d1ed63d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.serialization.record.RecordSchema; + +public class InheritSchemaFromRecord implements SchemaAccessStrategy { + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + if (readSchema == null) { + throw new SchemaNotFoundException("Cannot inherit Schema from Record because no schema was found"); + } + + return readSchema; + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return EnumSet.allOf(SchemaField.class); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java new file mode 100644 index 0000000..75dedc5 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordSchema; + +public class NopSchemaAccessWriter implements SchemaAccessWriter { + + @Override + public void writeHeader(RecordSchema schema, OutputStream out) throws IOException { + } + + @Override + public Map<String, String> getAttributes(RecordSchema schema) { + return Collections.emptyMap(); + } + + @Override + public void validateSchema(RecordSchema schema) throws SchemaNotFoundException { + } + + @Override + public Set<SchemaField> getRequiredSchemaFields() { + return EnumSet.noneOf(SchemaField.class); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java index 68f9ecf..923eaf0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java @@ -30,9 +30,10 @@ public interface SchemaAccessStrategy { * * @param flowFile flowfile * @param contentStream content of flowfile + * @param readSchema the schema that was read from the input FlowFile, or <code>null</code> if there was none * @return the RecordSchema for the FlowFile */ - RecordSchema getSchema(FlowFile flowFile, InputStream contentStream) throws SchemaNotFoundException, IOException; + RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException; /** * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}. http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java index d59e5da..796e1e4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java @@ -43,7 +43,7 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue(); if (schemaName.trim().isEmpty()) { throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name."); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java deleted file mode 100644 index f39bdca..0000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.schema.access; - -import java.io.OutputStream; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordSchema; - -public class SchemaTextAsAttribute implements SchemaAccessWriter { - private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); - - @Override - public void writeHeader(final RecordSchema schema, final OutputStream out) { - } - - @Override - public Map<String, String> getAttributes(final RecordSchema schema) { - final Optional<String> textFormatOption = schema.getSchemaFormat(); - final Optional<String> textOption = schema.getSchemaText(); - return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get()); - } - - @Override - public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { - final Optional<String> textFormatOption = schema.getSchemaFormat(); - if (!textFormatOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present"); - } - - final Optional<String> textOption = schema.getSchemaText(); - if (!textOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present"); - } - } - - @Override - public Set<SchemaField> getRequiredSchemaFields() { - return schemaFields; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index ee6b1ff..242c917 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -465,7 +465,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordSchema writeSchema; try { - writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); + writeSchema = writerFactory.getSchema(flowFile, recordSchema); } catch (final Exception e) { logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index e48568b..21b2e31 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -59,6 +59,7 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"}) @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. " @@ -309,6 +310,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final long startTime = System.nanoTime(); try (final PublisherLease lease = pool.obtainPublisher()) { @@ -323,24 +326,16 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); - final RecordSchema schema; - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - - try (final InputStream in = new BufferedInputStream(session.read(flowFile))) { - schema = writerFactory.getSchema(flowFile, in); - } catch (final Exception e) { - getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - continue; - } - try { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { - final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger()); - lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic); + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + final RecordSet recordSet = reader.createRecordSet(); + + final RecordSchema schema = writerFactory.getSchema(flowFile, recordSet.getSchema()); + lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic); } catch (final SchemaNotFoundException | MalformedRecordException e) { throw new ProcessException(e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 4238956..2004346 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -33,7 +33,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; @@ -96,7 +95,7 @@ public class PublisherLease implements Closeable { } } - void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema, + void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema, final String messageKeyField, final String topic) throws IOException { if (tracker == null) { tracker = new InFlightMessageTracker(); @@ -105,7 +104,6 @@ public class PublisherLease implements Closeable { final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); Record record; - final RecordSet recordSet = reader.createRecordSet(); int recordCount = 0; try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java index 7cff2a7..5c59c66 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java @@ -42,10 +42,10 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -65,7 +65,7 @@ public class TestPublishKafkaRecord_0_10 { public void setup() throws InitializationException, IOException { mockPool = mock(PublisherPool.class); mockLease = mock(PublisherLease.class); - Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), + Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), any(String.class), any(String.class)); when(mockPool.obtainPublisher()).thenReturn(mockLease); @@ -104,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); - verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); @@ -123,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3); - verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); @@ -138,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1); - verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).close(); } @@ -155,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3); - verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).close(); } @@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2); - verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); @@ -207,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 { runner.run(); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); - verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); @@ -241,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 { runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2); runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2); - verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 1549626..60e494b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.kafka.pubsub.util; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; @@ -53,7 +52,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(FlowFile flowFile, RecordSchema schema) throws SchemaNotFoundException, IOException { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java index 934eb59..b9d2e42 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java @@ -185,7 +185,7 @@ public class PutParquet extends AbstractPutHDFSRecord { return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema); } - private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) { + private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) { builder.withConf(conf); // Required properties http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java index 6ecfa59..ffff2a3 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -45,7 +45,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -217,7 +216,7 @@ public class FetchParquetTest { configure(proc); final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class); - when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException")); + when(recordSetWriter.write(any(Record.class))).thenThrow(new IOException("IOException")); final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java index 3a07dce..e634e2e 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java @@ -16,12 +16,25 @@ */ package org.apache.nifi.processors.parquet; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.BasicConfigurator; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -32,7 +45,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; @@ -49,21 +61,10 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.when; - public class PutParquetTest { @@ -76,6 +77,10 @@ public class PutParquetTest { private MockRecordParser readerFactory; private TestRunner testRunner; + @BeforeClass + public static void setupLogging() { + BasicConfigurator.configure(); + } @Before public void setup() throws IOException, InitializationException { @@ -108,8 +113,6 @@ public class PutParquetTest { testRunner.enableControllerService(readerFactory); testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory"); - testRunner.setProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); - testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, schema.toString()); } @Test @@ -325,7 +328,6 @@ public class PutParquetTest { @Test public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException { configure(proc, 10); - testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}"); final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis(); @@ -340,39 +342,6 @@ public class PutParquetTest { } @Test - public void testSchemaWithELMissingShouldRouteToFailure() throws InitializationException, IOException { - configure(proc, 10); - testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}"); - - final String filename = "testSchemaWithELMissingShouldRouteToFailure-" + System.currentTimeMillis(); - - // don't provide my.schema as an attribute - final Map<String,String> flowFileAttributes = new HashMap<>(); - flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); - - testRunner.enqueue("trigger", flowFileAttributes); - testRunner.run(); - testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); - } - - @Test - public void testInvalidSchemaShouldRouteToFailure() throws InitializationException, IOException { - configure(proc, 10); - testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}"); - - final String filename = "testInvalidSchemaShouldRouteToFailure-" + System.currentTimeMillis(); - - // don't provide my.schema as an attribute - final Map<String,String> flowFileAttributes = new HashMap<>(); - flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); - flowFileAttributes.put("my.schema", "NOT A SCHEMA"); - - testRunner.enqueue("trigger", flowFileAttributes); - testRunner.run(); - testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1); - } - - @Test public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { configure(proc, 10); @@ -427,6 +396,7 @@ public class PutParquetTest { final RecordReader recordReader = Mockito.mock(RecordReader.class); when(recordReader.createRecordSet()).thenReturn(recordSet); + when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema)); final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
