http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..1848020 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,77 @@ +nifi-record-serialization-services-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Grok + The following NOTICE information applies: + Grok + Copyright 2014 Anthony Corbacho, and contributors. + + (ASLv2) Groovy (org.codehaus.groovy:groovy:jar:2.4.5 - http://www.groovy-lang.org) + The following NOTICE information applies: + Groovy Language + Copyright 2003-2015 The respective authors and developers + Developers and Contributors are listed in the project POM file + and Gradle build file + + This product includes software developed by + The Groovy community (http://groovy.codehaus.org/). + + (ASLv2) Google GSON + The following NOTICE information applies: + Copyright 2008 Google Inc. + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta ([email protected]), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) JSON-SMART + The following NOTICE information applies: + Copyright 2011 JSON-SMART authors + + (ASLv2) JsonPath + The following NOTICE information applies: + Copyright 2011 JsonPath authors + + (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3) + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2013 The Apache Software Foundation + \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml new file mode 100644 index 0000000..9b2a56c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -0,0 +1,94 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to You under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-services-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-record-serialization-services</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>net.sf.opencsv</groupId> + <artifactId>opencsv</artifactId> + <version>2.3</version> + </dependency> + <dependency> + <groupId>io.thekraken</groupId> + <artifactId>grok</artifactId> + <version>0.1.5</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/csv/extra-white-space.csv</exclude> + <exclude>src/test/resources/csv/multi-bank-account.csv</exclude> + <exclude>src/test/resources/csv/single-bank-account.csv</exclude> + <exclude>src/test/resources/grok/error-with-stack-trace.log</exclude> + <exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude> + <exclude>src/test/resources/grok/nifi-log-sample.log</exclude> + <exclude>src/test/resources/grok/single-line-log-messages.txt</exclude> + <exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude> + <exclude>src/test/resources/json/bank-account-array.json</exclude> + <exclude>src/test/resources/json/json-with-unicode.json</exclude> + <exclude>src/test/resources/json/primitive-type-array.json</exclude> + <exclude>src/test/resources/json/single-bank-account.json</exclude> + <exclude>src/test/resources/json/single-element-nested-array.json</exclude> + <exclude>src/test/resources/json/single-element-nested.json</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java new file mode 100644 index 0000000..fc0c598 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; + +@Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"}) +@CapabilityDescription("Parses Avro data and returns each Avro record as an separate record.") +public class AvroReader extends AbstractControllerService implements RowRecordReaderFactory { + + @Override + public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException { + return new AvroRecordReader(in); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java new file mode 100644 index 0000000..e98a5ad --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Array; +import org.apache.avro.generic.GenericData.StringType; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class AvroRecordReader implements RecordReader { + private final InputStream in; + private final Schema avroSchema; + private final DataFileStream<GenericRecord> dataFileStream; + private RecordSchema recordSchema; + + public AvroRecordReader(final InputStream in) throws IOException, MalformedRecordException { + this.in = in; + + dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>()); + this.avroSchema = dataFileStream.getSchema(); + GenericData.setStringType(this.avroSchema, StringType.String); + } + + @Override + public void close() throws IOException { + dataFileStream.close(); + in.close(); + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (!dataFileStream.hasNext()) { + return null; + } + + GenericRecord record = null; + while (record == null && dataFileStream.hasNext()) { + record = dataFileStream.next(); + } + + final RecordSchema schema = getSchema(); + final Map<String, Object> values = convertRecordToObjectArray(record, schema); + return new MapRecord(schema, values); + } + + + private Map<String, Object> convertRecordToObjectArray(final GenericRecord record, final RecordSchema schema) { + final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); + + for (final String fieldName : schema.getFieldNames()) { + final Object value = record.get(fieldName); + + final Field avroField = record.getSchema().getField(fieldName); + if (avroField == null) { + values.put(fieldName, null); + continue; + } + + final Schema fieldSchema = avroField.schema(); + final DataType dataType = schema.getDataType(fieldName).orElse(null); + final Object converted = convertValue(value, fieldSchema, avroField.name(), dataType); + values.put(fieldName, converted); + } + + return values; + } + + + @Override + public RecordSchema getSchema() throws MalformedRecordException { + if (recordSchema != null) { + return recordSchema; + } + + recordSchema = createSchema(avroSchema); + return recordSchema; + } + + private RecordSchema createSchema(final Schema avroSchema) { + final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); + for (final Field field : avroSchema.getFields()) { + final String fieldName = field.name(); + final DataType dataType = determineDataType(field.schema()); + recordFields.add(new RecordField(fieldName, dataType)); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return recordSchema; + } + + private Object convertValue(final Object value, final Schema avroSchema, final String fieldName, final DataType desiredType) { + if (value == null) { + return null; + } + + switch (avroSchema.getType()) { + case UNION: + if (value instanceof GenericData.Record) { + final GenericData.Record record = (GenericData.Record) value; + return convertValue(value, record.getSchema(), fieldName, desiredType); + } + break; + case RECORD: + final GenericData.Record record = (GenericData.Record) value; + final Schema recordSchema = record.getSchema(); + final List<Field> recordFields = recordSchema.getFields(); + final Map<String, Object> values = new HashMap<>(recordFields.size()); + for (final Field field : recordFields) { + final DataType desiredFieldType = determineDataType(field.schema()); + final Object avroFieldValue = record.get(field.name()); + final Object fieldValue = convertValue(avroFieldValue, field.schema(), field.name(), desiredFieldType); + values.put(field.name(), fieldValue); + } + final RecordSchema childSchema = createSchema(recordSchema); + return new MapRecord(childSchema, values); + case BYTES: + final ByteBuffer bb = (ByteBuffer) value; + return bb.array(); + case FIXED: + final GenericFixed fixed = (GenericFixed) value; + return fixed.bytes(); + case ENUM: + return value.toString(); + case NULL: + return null; + case STRING: + return value.toString(); + case ARRAY: + final Array<?> array = (Array<?>) value; + final Object[] valueArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + final Schema elementSchema = avroSchema.getElementType(); + valueArray[i] = convertValue(array.get(i), elementSchema, fieldName, determineDataType(elementSchema)); + } + return valueArray; + case MAP: + final Map<?, ?> avroMap = (Map<?, ?>) value; + final Map<String, Object> map = new HashMap<>(avroMap.size()); + for (final Map.Entry<?, ?> entry : avroMap.entrySet()) { + Object obj = entry.getValue(); + if (obj instanceof Utf8 || obj instanceof CharSequence) { + obj = obj.toString(); + } + + map.put(entry.getKey().toString(), obj); + } + return map; + } + + return value; + } + + + private DataType determineDataType(final Schema avroSchema) { + final Type avroType = avroSchema.getType(); + + switch (avroType) { + case ARRAY: + case BYTES: + case FIXED: + return RecordFieldType.ARRAY.getDataType(); + case BOOLEAN: + return RecordFieldType.BOOLEAN.getDataType(); + case DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case ENUM: + case STRING: + return RecordFieldType.STRING.getDataType(); + case FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case INT: + return RecordFieldType.INT.getDataType(); + case LONG: + return RecordFieldType.LONG.getDataType(); + case RECORD: { + final List<Field> avroFields = avroSchema.getFields(); + final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); + + for (final Field field : avroFields) { + final String fieldName = field.name(); + final Schema fieldSchema = field.schema(); + final DataType fieldType = determineDataType(fieldSchema); + recordFields.add(new RecordField(fieldName, fieldType)); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return RecordFieldType.RECORD.getDataType(recordSchema); + } + case NULL: + case MAP: + return RecordFieldType.RECORD.getDataType(); + case UNION: { + final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() + .filter(s -> s.getType() != Type.NULL) + .collect(Collectors.toList()); + + if (nonNullSubSchemas.size() == 1) { + return determineDataType(nonNullSubSchemas.get(0)); + } + + final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); + for (final Schema subSchema : nonNullSubSchemas) { + final DataType childDataType = determineDataType(subSchema); + possibleChildTypes.add(childDataType); + } + + return RecordFieldType.CHOICE.getDataType(possibleChildTypes); + } + } + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java new file mode 100644 index 0000000..d56c716 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@Tags({"avro", "result", "set", "writer", "serializer", "record", "row"}) +@CapabilityDescription("Writes the contents of a Database ResultSet in Binary Avro format. The data types in the Result Set must match those " + + "specified by the Avro Schema. No type coercion will occur, with the exception of Date, Time, and Timestamps fields because Avro does not provide " + + "support for these types specifically. As a result, they will be converted to String fields using the configured formats. In addition, the label" + + "of the column must be a valid Avro field name.") +public class AvroRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory { + static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("Avro Schema") + .description("The Avro Schema to use when writing out the Result Set") + .addValidator(new AvroSchemaValidator()) + .expressionLanguageSupported(false) + .required(true) + .build(); + + private volatile Schema schema; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(SCHEMA); + return properties; + } + + @OnEnabled + public void storePropertyValues(final ConfigurationContext context) { + schema = new Schema.Parser().parse(context.getProperty(SCHEMA).getValue()); + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger) { + return new WriteAvroResult(schema, getDateFormat(), getTimeFormat(), getTimestampFormat()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java new file mode 100644 index 0000000..7151348 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import org.apache.avro.Schema; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +public class AvroSchemaValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + new Schema.Parser().parse(input); + + return new ValidationResult.Builder() + .valid(true) + .build(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Not a valid Avro Schema: " + e.getMessage()) + .build(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java new file mode 100644 index 0000000..d75d86d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +public class WriteAvroResult implements RecordSetWriter { + private final Schema schema; + private final DateFormat dateFormat; + private final DateFormat timeFormat; + private final DateFormat timestampFormat; + + public WriteAvroResult(final Schema schema, final String dateFormat, final String timeFormat, final String timestampFormat) { + this.schema = schema; + this.dateFormat = new SimpleDateFormat(dateFormat); + this.timeFormat = new SimpleDateFormat(timeFormat); + this.timestampFormat = new SimpleDateFormat(timestampFormat); + } + + @Override + public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException { + Record record = rs.next(); + if (record == null) { + return WriteResult.of(0, Collections.emptyMap()); + } + + final GenericRecord rec = new GenericData.Record(schema); + + int nrOfRows = 0; + final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, outStream); + + final RecordSchema recordSchema = rs.getSchema(); + + do { + for (final String fieldName : recordSchema.getFieldNames()) { + final Object value = record.getValue(fieldName); + + final Field field = schema.getField(fieldName); + if (field == null) { + continue; + } + + final Object converted; + try { + converted = convert(value, field.schema(), fieldName); + } catch (final SQLException e) { + throw new IOException("Failed to write records to stream", e); + } + + rec.put(fieldName, converted); + } + + dataFileWriter.append(rec); + nrOfRows++; + } while ((record = rs.next()) != null); + } + + return WriteResult.of(nrOfRows, Collections.emptyMap()); + } + + @Override + public WriteResult write(final Record record, final OutputStream out) throws IOException { + final GenericRecord rec = new GenericData.Record(schema); + + final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, out); + final RecordSchema recordSchema = record.getSchema(); + + for (final String fieldName : recordSchema.getFieldNames()) { + final Object value = record.getValue(fieldName); + + final Field field = schema.getField(fieldName); + if (field == null) { + continue; + } + + final Object converted; + try { + converted = convert(value, field.schema(), fieldName); + } catch (final SQLException e) { + throw new IOException("Failed to write records to stream", e); + } + + rec.put(fieldName, converted); + } + + dataFileWriter.append(rec); + } + + return WriteResult.of(1, Collections.emptyMap()); + } + + + private Object convert(final Object value, final Schema schema, final String fieldName) throws SQLException, IOException { + if (value == null) { + return null; + } + + // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement + if (value instanceof Clob) { + final Clob clob = (Clob) value; + + long numChars = clob.length(); + char[] buffer = new char[(int) numChars]; + InputStream is = clob.getAsciiStream(); + int index = 0; + int c = is.read(); + while (c > 0) { + buffer[index++] = (char) c; + c = is.read(); + } + + clob.free(); + return new String(buffer); + } + + if (value instanceof Blob) { + final Blob blob = (Blob) value; + + final long numChars = blob.length(); + final byte[] buffer = new byte[(int) numChars]; + final InputStream is = blob.getBinaryStream(); + int index = 0; + int c = is.read(); + while (c > 0) { + buffer[index++] = (byte) c; + c = is.read(); + } + + final ByteBuffer bb = ByteBuffer.wrap(buffer); + blob.free(); + return bb; + } + + if (value instanceof byte[]) { + // bytes requires little bit different handling + return ByteBuffer.wrap((byte[]) value); + } else if (value instanceof Byte) { + // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT + // But value is returned by JDBC as java.lang.Byte + // (at least H2 JDBC works this way) + // direct put to avro record results: + // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte + return ((Byte) value).intValue(); + } else if (value instanceof Short) { + //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand. + return ((Short) value).intValue(); + } else if (value instanceof BigDecimal) { + // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" + return value.toString(); + } else if (value instanceof BigInteger) { + // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that. + // It the SQL type is BIGINT and the precision is between 0 and 19 (inclusive); if so, the BigInteger is likely a + // long (and the schema says it will be), so try to get its value as a long. + // Otherwise, Avro can't handle BigInteger as a number - it will throw an AvroRuntimeException + // such as: "Unknown datum type: java.math.BigInteger: 38". In this case the schema is expecting a string. + final BigInteger bigInt = (BigInteger) value; + if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { + return value.toString(); + } else { + return bigInt.longValue(); + } + } else if (value instanceof Boolean) { + return value; + } else if (value instanceof Map) { + // TODO: Revisit how we handle a lot of these cases.... + switch (schema.getType()) { + case MAP: + return value; + case RECORD: + final GenericData.Record avroRecord = new GenericData.Record(schema); + + final Record record = (Record) value; + for (final String recordFieldName : record.getSchema().getFieldNames()) { + final Object recordFieldValue = record.getValue(recordFieldName); + + final Field field = schema.getField(recordFieldName); + if (field == null) { + continue; + } + + final Object converted = convert(recordFieldValue, field.schema(), recordFieldName); + avroRecord.put(recordFieldName, converted); + } + return avroRecord; + } + + return value.toString(); + + } else if (value instanceof List) { + return value; + } else if (value instanceof Object[]) { + final List<Object> list = new ArrayList<>(); + for (final Object o : ((Object[]) value)) { + final Object converted = convert(o, schema.getElementType(), fieldName); + list.add(converted); + } + return list; + } else if (value instanceof Number) { + return value; + } else if (value instanceof java.util.Date) { + final java.util.Date date = (java.util.Date) value; + return dateFormat.format(date); + } else if (value instanceof java.sql.Date) { + final java.sql.Date sqlDate = (java.sql.Date) value; + final java.util.Date date = new java.util.Date(sqlDate.getTime()); + return dateFormat.format(date); + } else if (value instanceof Time) { + final Time time = (Time) value; + final java.util.Date date = new java.util.Date(time.getTime()); + return timeFormat.format(date); + } else if (value instanceof Timestamp) { + final Timestamp time = (Timestamp) value; + final java.util.Date date = new java.util.Date(time.getTime()); + return timestampFormat.format(date); + } + + // The different types that we support are numbers (int, long, double, float), + // as well as boolean values and Strings. Since Avro doesn't provide + // timestamp types, we want to convert those to Strings. So we will cast anything other + // than numbers or booleans to strings by using the toString() method. + return value.toString(); + } + + + @Override + public String getMimeType() { + return "application/avro-binary"; + } + + + public static String normalizeNameForAvro(String inputName) { + String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_"); + if (Character.isDigit(normalizedName.charAt(0))) { + normalizedName = "_" + normalizedName; + } + return normalizedName; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java new file mode 100644 index 0000000..eccad7d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.UserTypeOverrideRowReader; + +@Tags({"csv", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"}) +@CapabilityDescription("Parses CSV-formatted data, returning each row in the CSV file as a separate record. " + + "This reader assumes that the first line in the content is the column names and all subsequent lines are " + + "the values. By default, the reader will assume that all columns are of 'String' type, but this can be " + + "overridden by adding a user-defined Property where the key is the name of a column and the value is the " + + "type of the column. For example, if a Property has the name \"balance\" with a value of float, it the " + + "reader will attempt to coerce all values in the \"balance\" column into a floating-point number. See " + + "Controller Service's Usage for further documentation.") +@DynamicProperty(name = "<name of column in CSV>", value = "<type of column values in CSV>", + description = "User-defined properties are used to indicate that the values of a specific column should be interpreted as a " + + "user-defined data type (e.g., int, double, float, date, etc.)", supportsExpressionLanguage = false) +public class CSVReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory { + + @Override + public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException { + return new CSVRecordReader(in, logger, getFieldTypeOverrides()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java new file mode 100644 index 0000000..c2e8963 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import au.com.bytecode.opencsv.CSVReader; + +public class CSVRecordReader implements RecordReader { + private final ComponentLog logger; + private final CSVReader reader; + private final String[] firstLine; + private final Map<String, DataType> fieldTypeOverrides; + private RecordSchema schema; + + public CSVRecordReader(final InputStream in, final ComponentLog logger, final Map<String, DataType> fieldTypeOverrides) throws IOException { + this.logger = logger; + reader = new CSVReader(new InputStreamReader(new BufferedInputStream(in))); + firstLine = reader.readNext(); + this.fieldTypeOverrides = fieldTypeOverrides; + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + final RecordSchema schema = getSchema(); + + while (true) { + final String[] line = reader.readNext(); + if (line == null) { + return null; + } + + final List<DataType> fieldTypes = schema.getDataTypes(); + if (fieldTypes.size() != line.length) { + logger.warn("Found record with incorrect number of fields. Expected {} but found {}; skipping record", new Object[] {fieldTypes.size(), line.length}); + continue; + } + + try { + final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount()); + + int i = 0; + for (final String fieldName : schema.getFieldNames()) { + if (i >= line.length) { + rowValues.put(fieldName, null); + continue; + } + + final String rawValue = line[i++].trim(); + final Object converted = convert(schema.getDataType(fieldName).orElse(null), rawValue); + rowValues.put(fieldName, converted); + } + + return new MapRecord(schema, rowValues); + } catch (final Exception e) { + throw new MalformedRecordException("Found invalid CSV record", e); + } + } + } + + @Override + public RecordSchema getSchema() { + if (schema != null) { + return schema; + } + + final List<RecordField> recordFields = new ArrayList<>(); + for (final String element : firstLine) { + + final String name = element.trim(); + final DataType dataType; + + final DataType overriddenDataType = fieldTypeOverrides.get(name); + if (overriddenDataType != null) { + dataType = overriddenDataType; + } else { + dataType = RecordFieldType.STRING.getDataType(); + } + + final RecordField field = new RecordField(name, dataType); + recordFields.add(field); + } + + if (recordFields.isEmpty()) { + recordFields.add(new RecordField("line", RecordFieldType.STRING.getDataType())); + } + + schema = new SimpleRecordSchema(recordFields); + return schema; + } + + protected Object convert(final DataType dataType, final String value) { + if (dataType == null) { + return value; + } + + switch (dataType.getFieldType()) { + case BOOLEAN: + if (value.length() == 0) { + return null; + } + return Boolean.parseBoolean(value); + case BYTE: + if (value.length() == 0) { + return null; + } + return Byte.parseByte(value); + case SHORT: + if (value.length() == 0) { + return null; + } + return Short.parseShort(value); + case INT: + if (value.length() == 0) { + return null; + } + return Integer.parseInt(value); + case LONG: + case BIGINT: + if (value.length() == 0) { + return null; + } + return Long.parseLong(value); + case FLOAT: + if (value.length() == 0) { + return null; + } + return Float.parseFloat(value); + case DOUBLE: + if (value.length() == 0) { + return null; + } + return Double.parseDouble(value); + case DATE: + if (value.length() == 0) { + return null; + } + try { + final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value); + return new java.sql.Date(date.getTime()); + } catch (final ParseException e) { + logger.warn("Found invalid value for DATE field: " + value + " does not match expected format of " + + dataType.getFormat() + "; will substitute a NULL value for this field"); + return null; + } + case TIME: + if (value.length() == 0) { + return null; + } + try { + final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value); + return new java.sql.Time(date.getTime()); + } catch (final ParseException e) { + logger.warn("Found invalid value for TIME field: " + value + " does not match expected format of " + + dataType.getFormat() + "; will substitute a NULL value for this field"); + return null; + } + case TIMESTAMP: + if (value.length() == 0) { + return null; + } + try { + final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value); + return new java.sql.Timestamp(date.getTime()); + } catch (final ParseException e) { + logger.warn("Found invalid value for TIMESTAMP field: " + value + " does not match expected format of " + + dataType.getFormat() + "; will substitute a NULL value for this field"); + return null; + } + case STRING: + default: + return value; + } + } + + @Override + public void close() throws IOException { + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java new file mode 100644 index 0000000..906e9c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@Tags({"csv", "result", "set", "writer", "serializer", "record", "row"}) +@CapabilityDescription("Writes the contents of a Database ResultSet as CSV data. The first line written " + + "will be the column names. All subsequent lines will be the values corresponding to those columns.") +public class CSVRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory { + + @Override + public RecordSetWriter createWriter(final ComponentLog logger) { + return new WriteCSVResult(getDateFormat(), getTimeFormat(), getTimestampFormat()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java new file mode 100644 index 0000000..79c602d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Collections; +import java.util.Optional; + +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.NonCloseableOutputStream; + +import au.com.bytecode.opencsv.CSVWriter; + +public class WriteCSVResult implements RecordSetWriter { + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + public WriteCSVResult(final String dateFormat, final String timeFormat, final String timestampFormat) { + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; + } + + private String getFormat(final Record record, final String fieldName) { + final Optional<DataType> dataTypeOption = record.getSchema().getDataType(fieldName); + if (!dataTypeOption.isPresent()) { + return null; + } + + final DataType dataType = dataTypeOption.get(); + switch (dataType.getFieldType()) { + case DATE: + return dateFormat == null ? dataType.getFormat() : dateFormat; + case TIME: + return timeFormat == null ? dataType.getFormat() : timeFormat; + case TIMESTAMP: + return timestampFormat == null ? dataType.getFormat() : timestampFormat; + } + + return dataType.getFormat(); + } + + @Override + public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { + int count = 0; + try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut); + final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable); + final CSVWriter writer = new CSVWriter(streamWriter)) { + + try { + final RecordSchema schema = rs.getSchema(); + final String[] columnNames = schema.getFieldNames().toArray(new String[0]); + writer.writeNext(columnNames); + + Record record; + while ((record = rs.next()) != null) { + final String[] colVals = new String[schema.getFieldCount()]; + int i = 0; + for (final String fieldName : schema.getFieldNames()) { + colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName)); + } + + writer.writeNext(colVals); + count++; + } + } catch (final Exception e) { + throw new IOException("Failed to serialize results", e); + } + } + + return WriteResult.of(count, Collections.emptyMap()); + } + + @Override + public WriteResult write(final Record record, final OutputStream rawOut) throws IOException { + try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut); + final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable); + final CSVWriter writer = new CSVWriter(streamWriter)) { + + try { + final RecordSchema schema = record.getSchema(); + final String[] columnNames = schema.getFieldNames().toArray(new String[0]); + writer.writeNext(columnNames); + + final String[] colVals = new String[schema.getFieldCount()]; + int i = 0; + for (final String fieldName : schema.getFieldNames()) { + colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName)); + } + + writer.writeNext(colVals); + } catch (final Exception e) { + throw new IOException("Failed to serialize results", e); + } + } + + return WriteResult.of(1, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/csv"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java new file mode 100644 index 0000000..dd9c4e0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +import io.thekraken.grok.api.Grok; + +public class GrokExpressionValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + new Grok().compile(input); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Invalid Grok pattern: " + e.getMessage()) + .build(); + } + + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java new file mode 100644 index 0000000..f72d5d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.UserTypeOverrideRowReader; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.exception.GrokException; + +@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"}) +@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data " + + "so that it can be processed. The service is configured using Grok patterns. " + + "The service reads from a stream of data and splits each message that it finds into a separate Record, each containing the fields that are configured. " + + "If a line in the input does not match the expected message pattern, the line of text is considered to be part of the previous " + + "message, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part " + + "of the previous message but is added to the 'STACK_TRACE' field of the Record. If a record has no stack trace, it will have a NULL value " + + "for the STACK_TRACE field.") +public class GrokReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory { + private volatile Grok grok; + + private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; + + static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder() + .name("Grok Pattern File") + .description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file " + + "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's " + + "Additional Details for a list of pre-defined patterns.") + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() + .name("Grok Expression") + .description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. " + + "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message.") + .addValidator(new GrokExpressionValidator()) + .required(true) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(PATTERN_FILE); + properties.add(GROK_EXPRESSION); + return properties; + } + + @OnEnabled + public void preCompile(final ConfigurationContext context) throws GrokException, IOException { + grok = new Grok(); + + try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME); + final Reader reader = new InputStreamReader(in)) { + grok.addPatternFromReader(reader); + } + + if (context.getProperty(PATTERN_FILE).isSet()) { + grok.addPatternFromFile(context.getProperty(PATTERN_FILE).getValue()); + } + + grok.compile(context.getProperty(GROK_EXPRESSION).getValue()); + } + + @Override + public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException { + return new GrokRecordReader(in, grok, getFieldTypeOverrides()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java new file mode 100644 index 0000000..bdf12f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.GrokUtils; +import io.thekraken.grok.api.Match; + +public class GrokRecordReader implements RecordReader { + private final BufferedReader reader; + private final Grok grok; + private final Map<String, DataType> fieldTypeOverrides; + + private String nextLine; + private RecordSchema schema; + + static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; + private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( + "^\\s*(?:(?: |\\t)+at )|" + + "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|" + + "(?:Caused by\\: )|" + + "(?:Suppressed\\: )|" + + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)"); + + private static final FastDateFormat TIME_FORMAT_DATE; + private static final FastDateFormat TIME_FORMAT_TIME; + private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + + static { + final TimeZone gmt = TimeZone.getTimeZone("GMT"); + TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt); + TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); + TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt); + } + + public GrokRecordReader(final InputStream in, final Grok grok, final Map<String, DataType> fieldTypeOverrides) { + this.reader = new BufferedReader(new InputStreamReader(in)); + this.grok = grok; + this.fieldTypeOverrides = fieldTypeOverrides; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + final String line = nextLine == null ? reader.readLine() : nextLine; + nextLine = null; // ensure that we don't process nextLine again + if (line == null) { + return null; + } + + final RecordSchema schema = getSchema(); + + final Match match = grok.match(line); + match.captures(); + final Map<String, Object> valueMap = match.toMap(); + if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array. + return new MapRecord(schema, Collections.emptyMap()); + } + + // Read the next line to see if it matches the pattern (in which case we will simply leave it for + // the next call to nextRecord()) or we will attach it to the previously read record. + String stackTrace = null; + final StringBuilder toAppend = new StringBuilder(); + while ((nextLine = reader.readLine()) != null) { + final Match nextLineMatch = grok.match(nextLine); + nextLineMatch.captures(); + final Map<String, Object> nextValueMap = nextLineMatch.toMap(); + if (nextValueMap.isEmpty()) { + // next line did not match. Check if it indicates a Stack Trace. If so, read until + // the stack trace ends. Otherwise, append the next line to the last field in the record. + if (isStartOfStackTrace(nextLine)) { + stackTrace = readStackTrace(nextLine); + break; + } else { + toAppend.append("\n").append(nextLine); + } + } else { + // The next line matched our pattern. + break; + } + } + + try { + final List<DataType> fieldTypes = schema.getDataTypes(); + final Map<String, Object> values = new HashMap<>(fieldTypes.size()); + + for (final String fieldName : schema.getFieldNames()) { + final Object value = valueMap.get(fieldName); + if (value == null) { + values.put(fieldName, null); + continue; + } + + final DataType fieldType = schema.getDataType(fieldName).orElse(null); + final Object converted = convert(fieldType, value.toString()); + values.put(fieldName, converted); + } + + final String lastFieldBeforeStackTrace = schema.getFieldNames().get(schema.getFieldCount() - 2); + if (toAppend.length() > 0) { + final Object existingValue = values.get(lastFieldBeforeStackTrace); + final String updatedValue = existingValue == null ? toAppend.toString() : existingValue + toAppend.toString(); + values.put(lastFieldBeforeStackTrace, updatedValue); + } + + values.put(STACK_TRACE_COLUMN_NAME, stackTrace); + + return new MapRecord(schema, values); + } catch (final Exception e) { + throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + line, e); + } + } + + + private boolean isStartOfStackTrace(final String line) { + if (line == null) { + return false; + } + + // Stack Traces are generally of the form: + // java.lang.IllegalArgumentException: My message + // at org.apache.nifi.MyClass.myMethod(MyClass.java:48) + // at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] + // Caused by: java.net.SocketTimeoutException: null + // ... 13 common frames omitted + + int index = line.indexOf("Exception: "); + if (index < 0) { + index = line.indexOf("Error: "); + } + + if (index < 0) { + return false; + } + + if (line.indexOf(" ") < index) { + return false; + } + + return true; + } + + private String readStackTrace(final String firstLine) throws IOException { + final StringBuilder sb = new StringBuilder(firstLine); + + String line; + while ((line = reader.readLine()) != null) { + if (isLineInStackTrace(line)) { + sb.append("\n").append(line); + } else { + nextLine = line; + break; + } + } + + return sb.toString(); + } + + private boolean isLineInStackTrace(final String line) { + return STACK_TRACE_PATTERN.matcher(line).find(); + } + + + protected Object convert(final DataType fieldType, final String string) { + if (fieldType == null) { + return string; + } + switch (fieldType.getFieldType()) { + case BOOLEAN: + if (string.length() == 0) { + return null; + } + return Boolean.parseBoolean(string); + case BYTE: + if (string.length() == 0) { + return null; + } + return Byte.parseByte(string); + case SHORT: + if (string.length() == 0) { + return null; + } + return Short.parseShort(string); + case INT: + if (string.length() == 0) { + return null; + } + return Integer.parseInt(string); + case LONG: + if (string.length() == 0) { + return null; + } + return Long.parseLong(string); + case FLOAT: + if (string.length() == 0) { + return null; + } + return Float.parseFloat(string); + case DOUBLE: + if (string.length() == 0) { + return null; + } + return Double.parseDouble(string); + case DATE: + if (string.length() == 0) { + return null; + } + try { + Date date = TIME_FORMAT_DATE.parse(string); + return new java.sql.Date(date.getTime()); + } catch (ParseException e) { + return null; + } + case TIME: + if (string.length() == 0) { + return null; + } + try { + Date date = TIME_FORMAT_TIME.parse(string); + return new java.sql.Time(date.getTime()); + } catch (ParseException e) { + return null; + } + case TIMESTAMP: + if (string.length() == 0) { + return null; + } + try { + Date date = TIME_FORMAT_TIMESTAMP.parse(string); + return new java.sql.Timestamp(date.getTime()); + } catch (ParseException e) { + return null; + } + case STRING: + default: + return string; + } + } + + + @Override + public RecordSchema getSchema() { + if (schema != null) { + return schema; + } + + final List<RecordField> fields = new ArrayList<>(); + + String grokExpression = grok.getOriginalGrokPattern(); + while (grokExpression.length() > 0) { + final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); + if (matcher.find()) { + final Map<String, String> namedGroups = GrokUtils.namedGroups(matcher, grokExpression); + final String fieldName = namedGroups.get("subname"); + + DataType dataType = fieldTypeOverrides.get(fieldName); + if (dataType == null) { + dataType = RecordFieldType.STRING.getDataType(); + } + + final RecordField recordField = new RecordField(fieldName, dataType); + fields.add(recordField); + + if (grokExpression.length() > matcher.end() + 1) { + grokExpression = grokExpression.substring(matcher.end() + 1); + } else { + break; + } + } + } + + fields.add(new RecordField(STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); + + schema = new SimpleRecordSchema(fields); + return schema; + } + +}
