Repository: nifi Updated Branches: refs/heads/master b7c15c360 -> 9b177fbcb
NIFI-3787: Addressed NPE and ensure that if validation fails due to RuntimeException, that it gets logged. Also clarified documentation for Json Reader services This closes #1742. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b177fbc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b177fbc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b177fbc Branch: refs/heads/master Commit: 9b177fbcbabe67b9ebb1d0a5d3fafe7b67aebda1 Parents: b7c15c3 Author: Mark Payne <[email protected]> Authored: Wed May 3 12:33:30 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Wed May 3 13:13:43 2017 -0400 ---------------------------------------------------------------------- .../nifi/serialization/record/MapRecord.java | 2 +- .../record/util/DataTypeUtils.java | 23 +++++++++ .../nifi/serialization/DateTimeUtils.java | 13 +++-- .../controller/AbstractConfiguredComponent.java | 4 ++ .../java/org/apache/nifi/avro/AvroReader.java | 22 +++++++++ .../apache/nifi/avro/AvroRecordSetWriter.java | 2 - .../avro/EmbeddedAvroSchemaAccessStrategy.java | 51 ++++++++++++++++++++ .../nifi/csv/CSVHeaderSchemaStrategy.java | 9 ++++ .../java/org/apache/nifi/csv/CSVReader.java | 12 ++++- .../org/apache/nifi/csv/CSVRecordSetWriter.java | 3 +- .../java/org/apache/nifi/grok/GrokReader.java | 37 +++++++++----- .../nifi/json/JsonPathRowRecordReader.java | 6 +-- .../nifi/json/JsonTreeRowRecordReader.java | 6 +-- .../additionalDetails.html | 3 +- .../additionalDetails.html | 6 ++- 15 files changed, 169 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index f9a22fc..8d98c33 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -192,7 +192,7 @@ public class MapRecord implements Record { @Override public Date getAsDate(final String fieldName, final String format) { - return DataTypeUtils.toDate(getValue(fieldName), DataTypeUtils.getDateFormat(format), fieldName); + return DataTypeUtils.toDate(getValue(fieldName), format == null ? null : DataTypeUtils.getDateFormat(format), fieldName); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index f59ac0d..9f1e463 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -32,6 +32,7 @@ import java.sql.Timestamp; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -343,6 +344,10 @@ public class DataTypeUtils { return getDateFormat(format).format((java.util.Date) value); } + if (value instanceof Object[]) { + return Arrays.toString((Object[]) value); + } + return value.toString(); } @@ -396,6 +401,10 @@ public class DataTypeUtils { } if (value instanceof String) { + if (format == null) { + return isInteger((String) value); + } + try { getDateFormat(format).parse((String) value); return true; @@ -407,6 +416,20 @@ public class DataTypeUtils { return false; } + private static boolean isInteger(final String value) { + if (value == null || value.isEmpty()) { + return false; + } + + for (int i = 0; i < value.length(); i++) { + if (!Character.isDigit(value.charAt(i))) { + return false; + } + } + + return true; + } + public static Time toTime(final Object value, final DateFormat format, final String fieldName) { if (value == null) { return null; http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java index efc3e4e..6336943 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java @@ -23,7 +23,9 @@ public class DateTimeUtils { public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() .name("Date Format") .description("Specifies the format to use when reading/writing Date fields. " - + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).") + + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). " + + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by " + + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).") .expressionLanguageSupported(false) .addValidator(new SimpleDateFormatValidator()) .required(false) @@ -32,7 +34,9 @@ public class DateTimeUtils { public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() .name("Time Format") .description("Specifies the format to use when reading/writing Time fields. " - + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).") + + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). " + + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by " + + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).") .expressionLanguageSupported(false) .addValidator(new SimpleDateFormatValidator()) .required(false) @@ -41,7 +45,10 @@ public class DateTimeUtils { public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() .name("Timestamp Format") .description("Specifies the format to use when reading/writing Timestamp fields. " - + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).") + + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). " + + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by " + + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by " + + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).") .expressionLanguageSupported(false) .addValidator(new SimpleDateFormatValidator()) .required(false) http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 1156987..50a71e7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -30,6 +30,8 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URL; import java.util.ArrayList; @@ -49,6 +51,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { + private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class); private final String id; private final ValidationContextFactory validationContextFactory; @@ -463,6 +466,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } } } catch (final Throwable t) { + logger.error("Failed to perform validation of " + this, t); results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index 205fec6..8451f5a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -29,10 +29,14 @@ import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; @@ -63,6 +67,24 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) { + if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) { + return new EmbeddedAvroSchemaAccessStrategy(); + } else { + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ValidationContext context) { + if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) { + return new EmbeddedAvroSchemaAccessStrategy(); + } else { + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + } + + @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue(); if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 91ca6cf..121d1ec 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -65,8 +65,6 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement try { final RecordSchema recordSchema = getSchema(flowFile, in); - - final Schema avroSchema; try { if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java new file mode 100644 index 0000000..eba9429 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; + +public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy { + private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>()); + final Schema avroSchema = dataFileStream.getSchema(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + return recordSchema; + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index a57f10b..f2b1cbb 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.io.input.BOMInputStream; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schema.access.SchemaAccessStrategy; @@ -47,8 +48,16 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { this.context = context; } + public CSVHeaderSchemaStrategy(final ValidationContext context) { + this.context = null; + } + @Override public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + if (this.context == null) { + throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); + } + try { final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader(); try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream)); http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index 9fe4136..dbea3dc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -52,8 +53,6 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact "The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the " + "column names in the header and assuming that all columns are of type String."); - private volatile SchemaAccessStrategy headerDerivedSchemaStrategy; - private volatile CSVFormat csvFormat; private volatile String dateFormat; private volatile String timeFormat; @@ -106,6 +105,15 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact } @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { + if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) { + return new CSVHeaderSchemaStrategy(context); + } + + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + + @Override protected List<AllowableValue> getSchemaAccessStrategyValues() { final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues()); allowableValues.add(headerDerivedAllowableValue); http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index 15f1e1f..0b29f09 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -38,7 +38,8 @@ import org.apache.nifi.serialization.record.RecordSchema; @Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"}) @CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written " - + "will be the column names. All subsequent lines will be the values corresponding to those columns.") + + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values " + + "corresponding to the record fields.") public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { private volatile CSVFormat csvFormat; http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 6c8deab..a874632 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -33,6 +33,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -180,24 +181,36 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac @Override protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { - return new SchemaAccessStrategy() { - private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); - - @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { - return recordSchema; - } + return createAccessStrategy(); + } else { + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + } - @Override - public Set<SchemaField> getSuppliedSchemaFields() { - return schemaFields; - } - }; + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { + if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { + return createAccessStrategy(); } else { return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } } + private SchemaAccessStrategy createAccessStrategy() { + return new SchemaAccessStrategy() { + private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + return recordSchema; + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } + }; + } @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java index 2eebfe2..7d2a2c1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -62,9 +62,9 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { throws MalformedRecordException, IOException { super(in, logger); - this.dateFormat = DataTypeUtils.getDateFormat(dateFormat); - this.timeFormat = DataTypeUtils.getDateFormat(timeFormat); - this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat); + this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); this.schema = schema; this.jsonPaths = jsonPaths; http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index f15e04e..ee5bff9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -56,9 +56,9 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { super(in, logger); this.schema = schema; - this.dateFormat = DataTypeUtils.getDateFormat(dateFormat); - this.timeFormat = DataTypeUtils.getDateFormat(timeFormat); - this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat); + this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); } http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html index aceb54d..14d40f6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html @@ -81,7 +81,8 @@ <li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li> <li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li> <li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding - property (Date Format, Time Format, Timestamp Format property).</li> + property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String + representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li> </ul> <p> http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html index 90980d1..c08e720 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html @@ -25,7 +25,8 @@ The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire JSON Object tree. The Controller Service must be configured with a Schema that describes the structure of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped. - If the schema contains a field for which no JSON field exists, a null value will be used in the Record. + If the schema contains a field for which no JSON field exists, a null value will be used in the Record + (or the default value defined in the schema, if applicable). </p> <p> @@ -66,7 +67,8 @@ <li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li> <li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li> <li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding - property (Date Format, Time Format, Timestamp Format property).</li> + property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String + representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li> </ul> <p>
