http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java new file mode 100644 index 0000000..1ac6b36 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup.maxmind; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.RecordLookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; + +import com.maxmind.geoip2.model.AnonymousIpResponse; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; +import com.maxmind.geoip2.model.DomainResponse; +import com.maxmind.geoip2.model.IspResponse; +import com.maxmind.geoip2.record.Country; +import com.maxmind.geoip2.record.Location; +import com.maxmind.geoip2.record.Subdivision; + +@Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"}) +@CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind " + + "Database file and specifying which types of enrichment should be provided for an IP Address. Each type of enrichment is a separate lookup, so configuring the " + + "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component " + + "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.") +public class IPLookupService extends AbstractControllerService implements RecordLookupService { + private volatile DatabaseReader databaseReader = null; + + static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder() + .name("database-file") + .displayName("MaxMind Database File") + .description("Path to Maxmind IP Enrichment Database File") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + static final PropertyDescriptor LOOKUP_CITY = new PropertyDescriptor.Builder() + .name("lookup-city") + .displayName("Lookup Geo Enrichment") + .description("Specifies whether or not information about the geographic information, such as cities, corresponding to the IP address should be returned") + .allowableValues("true", "false") + .defaultValue("true") + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor LOOKUP_ISP = new PropertyDescriptor.Builder() + .name("lookup-isp") + .displayName("Lookup ISP") + .description("Specifies whether or not information about the Information Service Provider corresponding to the IP address should be returned") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + static final PropertyDescriptor LOOKUP_DOMAIN = new PropertyDescriptor.Builder() + .name("lookup-domain") + .displayName("Lookup Domain Name") + .description("Specifies whether or not information about the Domain Name corresponding to the IP address should be returned. " + + "If true, the lookup will contain second-level domain information, such as foo.com but will not contain bar.foo.com") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + static final PropertyDescriptor LOOKUP_CONNECTION_TYPE = new PropertyDescriptor.Builder() + .name("lookup-connection-type") + .displayName("Lookup Connection Type") + .description("Specifies whether or not information about the Connection Type corresponding to the IP address should be returned. " + + "If true, the lookup will contain a 'connectionType' field that (if populated) will contain a value of 'Dialup', 'Cable/DSL', 'Corporate', or 'Cellular'") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + static final PropertyDescriptor LOOKUP_ANONYMOUS_IP_INFO = new PropertyDescriptor.Builder() + .name("lookup-anonymous-ip") + .displayName("Lookup Anonymous IP Information") + .description("Specifies whether or not information about whether or not the IP address belongs to an anonymous network should be returned.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(GEO_DATABASE_FILE); + properties.add(LOOKUP_CITY); + properties.add(LOOKUP_ISP); + properties.add(LOOKUP_DOMAIN); + properties.add(LOOKUP_CONNECTION_TYPE); + properties.add(LOOKUP_ANONYMOUS_IP_INFO); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws IOException { + final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); + final File dbFile = new File(dbFileString); + final StopWatch stopWatch = new StopWatch(true); + final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); + stopWatch.stop(); + getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[] {stopWatch.getDuration(TimeUnit.MILLISECONDS)}); + databaseReader = reader; + } + + @OnStopped + public void closeReader() throws IOException { + final DatabaseReader reader = databaseReader; + if (reader != null) { + reader.close(); + } + } + + @Override + public Optional<Record> lookup(final String key) throws LookupFailureException { + if (key == null) { + return Optional.empty(); + } + + final InetAddress inetAddress; + try { + inetAddress = InetAddress.getByName(key); + } catch (final IOException ioe) { + getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " + + "providing the service with an invalid IP address", new Object[] {key}, ioe); + + return Optional.empty(); + } + + final Record geoRecord; + if (getProperty(LOOKUP_CITY).asBoolean()) { + final CityResponse cityResponse; + try { + cityResponse = databaseReader.city(inetAddress); + } catch (final Exception e) { + throw new LookupFailureException("Failed to lookup City information for IP Address " + inetAddress, e); + } + + geoRecord = createRecord(cityResponse); + } else { + geoRecord = null; + } + + final Record ispRecord; + if (getProperty(LOOKUP_ISP).asBoolean()) { + final IspResponse ispResponse; + try { + ispResponse = databaseReader.isp(inetAddress); + } catch (final Exception e) { + throw new LookupFailureException("Failed to lookup ISP information for IP Address " + inetAddress, e); + } + + ispRecord = createRecord(ispResponse); + } else { + ispRecord = null; + } + + final String domainName; + if (getProperty(LOOKUP_DOMAIN).asBoolean()) { + final DomainResponse domainResponse; + try { + domainResponse = databaseReader.domain(inetAddress); + } catch (final Exception e) { + throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e); + } + + domainName = domainResponse == null ? null : domainResponse.getDomain(); + } else { + domainName = null; + } + + final String connectionType; + if (getProperty(LOOKUP_CONNECTION_TYPE).asBoolean()) { + final ConnectionTypeResponse connectionTypeResponse; + try { + connectionTypeResponse = databaseReader.connectionType(inetAddress); + } catch (final Exception e) { + throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e); + } + + if (connectionTypeResponse == null) { + connectionType = null; + } else { + final ConnectionType type = connectionTypeResponse.getConnectionType(); + connectionType = type == null ? null : type.name(); + } + } else { + connectionType = null; + } + + final Record anonymousIpRecord; + if (getProperty(LOOKUP_ANONYMOUS_IP_INFO).asBoolean()) { + final AnonymousIpResponse anonymousIpResponse; + try { + anonymousIpResponse = databaseReader.anonymousIp(inetAddress); + } catch (final Exception e) { + throw new LookupFailureException("Failed to lookup Anonymous IP Information for IP Address " + inetAddress, e); + } + + anonymousIpRecord = createRecord(anonymousIpResponse); + } else { + anonymousIpRecord = null; + } + + return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, domainName, connectionType, anonymousIpRecord)); + } + + private Record createRecord(final CityResponse city) { + if (city == null) { + return null; + } + + final Map<String, Object> values = new HashMap<>(); + values.put(CitySchema.CITY.getFieldName(), city.getCity().getName()); + + final Location location = city.getLocation(); + values.put(CitySchema.ACCURACY.getFieldName(), location.getAccuracyRadius()); + values.put(CitySchema.METRO_CODE.getFieldName(), location.getMetroCode()); + values.put(CitySchema.TIMEZONE.getFieldName(), location.getTimeZone()); + values.put(CitySchema.LATITUDE.getFieldName(), location.getLatitude()); + values.put(CitySchema.LONGITUDE.getFieldName(), location.getLongitude()); + values.put(CitySchema.CONTINENT.getFieldName(), city.getContinent().getName()); + values.put(CitySchema.POSTALCODE.getFieldName(), city.getPostal().getCode()); + values.put(CitySchema.COUNTRY.getFieldName(), createRecord(city.getCountry())); + + final Object[] subdivisions = new Object[city.getSubdivisions().size()]; + int i = 0; + for (final Subdivision subdivision : city.getSubdivisions()) { + subdivisions[i++] = createRecord(subdivision); + } + values.put(CitySchema.SUBDIVISIONS.getFieldName(), subdivisions); + + return new MapRecord(CitySchema.GEO_SCHEMA, values); + } + + private Record createRecord(final Subdivision subdivision) { + if (subdivision == null) { + return null; + } + + final Map<String, Object> values = new HashMap<>(2); + values.put(CitySchema.SUBDIVISION_NAME.getFieldName(), subdivision.getName()); + values.put(CitySchema.SUBDIVISION_ISO.getFieldName(), subdivision.getIsoCode()); + return new MapRecord(CitySchema.SUBDIVISION_SCHEMA, values); + } + + private Record createRecord(final Country country) { + if (country == null) { + return null; + } + + final Map<String, Object> values = new HashMap<>(2); + values.put(CitySchema.COUNTRY_NAME.getFieldName(), country.getName()); + values.put(CitySchema.COUNTRY_ISO.getFieldName(), country.getIsoCode()); + return new MapRecord(CitySchema.COUNTRY_SCHEMA, values); + } + + private Record createRecord(final IspResponse isp) { + if (isp == null) { + return null; + } + + final Map<String, Object> values = new HashMap<>(4); + values.put(IspSchema.ASN.getFieldName(), isp.getAutonomousSystemNumber()); + values.put(IspSchema.ASN_ORG.getFieldName(), isp.getAutonomousSystemOrganization()); + values.put(IspSchema.NAME.getFieldName(), isp.getIsp()); + values.put(IspSchema.ORG.getFieldName(), isp.getOrganization()); + + return new MapRecord(IspSchema.ISP_SCHEMA, values); + } + + private Record createRecord(final AnonymousIpResponse anonymousIp) { + if (anonymousIp == null) { + return null; + } + + final Map<String, Object> values = new HashMap<>(5); + values.put(AnonymousIpSchema.ANONYMOUS.getFieldName(), anonymousIp.isAnonymous()); + values.put(AnonymousIpSchema.ANONYMOUS_VPN.getFieldName(), anonymousIp.isAnonymousVpn()); + values.put(AnonymousIpSchema.HOSTING_PROVIDER.getFieldName(), anonymousIp.isHostingProvider()); + values.put(AnonymousIpSchema.PUBLIC_PROXY.getFieldName(), anonymousIp.isPublicProxy()); + values.put(AnonymousIpSchema.TOR_EXIT_NODE.getFieldName(), anonymousIp.isTorExitNode()); + + return new MapRecord(AnonymousIpSchema.ANONYMOUS_IP_SCHEMA, values); + } + + private Record createContainerRecord(final Record geoRecord, final Record ispRecord, final String domainName, final String connectionType, final Record anonymousIpRecord) { + final Map<String, Object> values = new HashMap<>(4); + values.put("geo", geoRecord); + values.put("isp", ispRecord); + values.put("domainName", domainName); + values.put("connectionType", connectionType); + values.put("anonymousIp", anonymousIpRecord); + + final Record containerRecord = new MapRecord(ContainerSchema.CONTAINER_SCHEMA, values); + return containerRecord; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java new file mode 100644 index 0000000..a487716 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup.maxmind; + +import java.util.Arrays; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class IspSchema { + static final RecordField NAME = new RecordField("name", RecordFieldType.STRING.getDataType()); + static final RecordField ORG = new RecordField("organization", RecordFieldType.STRING.getDataType()); + static final RecordField ASN = new RecordField("asn", RecordFieldType.INT.getDataType()); + static final RecordField ASN_ORG = new RecordField("asnOrganization", RecordFieldType.STRING.getDataType()); + + static final RecordSchema ISP_SCHEMA = new SimpleRecordSchema(Arrays.asList(NAME, ORG, ASN, ASN_ORG)); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..cdff77c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.lookup.maxmind.IPLookupService +org.apache.nifi.lookup.SimpleKeyValueLookupService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html new file mode 100644 index 0000000..8f64510 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html @@ -0,0 +1,102 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>IPLookupService</title> + + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + The IPLookupService is powered by a MaxMind database and can return several different types of enrichment information + about a given IP address. Below is the schema of the Record that is returned by this service (in Avro Schema format). + </p> + +<code> +<pre> +{ + "name": "ipEnrichment", + "namespace": "nifi", + "type": "record", + "fields": [ + { + "name": "geo", + "type": { + "name": "cityGeo", + "type": "record", + "fields": [ + { "name": "city", "type": "string" }, + { "name": "accuracy", "type": "int", "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" }, + { "name": "metroCode", "type": "int" }, + { "name": "timeZone", "type": "string" }, + { "name": "latitude", "type": "double" }, + { "name": "longitude", "type": "double" }, + { "name": "country", "type": { + "type": "record", + "name": "country", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "isoCode", "type": "string" } + ] + } }, + { "name": "subdivisions", "type": { + "type": "array", + "items": { + "type": "record", + "name": "subdivision", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "isoCode", "type": "string" } + ] + } + } + }, + { "name": "continent", "type": "string" }, + { "name": "postalCode", "type": "string" } + ] + } + }, + { + "name": "isp", + "type": { + "name": "ispEnrich", + "type": "record", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "organization", "type": "string" }, + { "name": "asn", "type": "int" }, + { "name": "asnOrganization", "type": "string" } + ] + } + }, + { + "name": "domainName", + "type": "string" + }, + { + "name": "connectionType", + "type": "string", + "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'" + } + ] +} +</pre> +</code> + + </body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml new file mode 100644 index 0000000..a317cb2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml @@ -0,0 +1,28 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services</artifactId> + <version>1.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-lookup-services-bundle</artifactId> + <packaging>pom</packaging> + <modules> + <module>nifi-lookup-services</module> + <module>nifi-lookup-services-nar</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java index 9ea6e64..c819a6c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.nifi.serialization; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.flowfile.FlowFile; @@ -66,9 +67,11 @@ public interface RecordSetWriterFactory extends ControllerService { * @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service * because it allows messages to be logged for the component that is calling this Controller Service. * @param schema the schema that will be used for writing records + * @param flowFile the FlowFile to write to + * @param out the OutputStream to write to * * @return a RecordSetWriter that can write record sets to an OutputStream * @throws IOException if unable to read from the given InputStream */ - RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException; + RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 1360add..70da1e8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -18,6 +18,7 @@ package org.apache.nifi.avro; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.EnumSet; import java.util.LinkedHashMap; @@ -31,6 +32,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.schema.access.SchemaField; @@ -57,7 +59,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement "The FlowFile will have the Avro schema embedded into the content, as is typical with Avro"); @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema) throws IOException { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); try { @@ -78,9 +80,9 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement } if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { - return new WriteAvroResultWithSchema(avroSchema); + return new WriteAvroResultWithSchema(avroSchema, out); } else { - return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema)); + return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out); } } catch (final SchemaNotFoundException e) { throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java index c09e3d5..799d3ee 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java @@ -32,9 +32,11 @@ import java.util.Collections; public abstract class WriteAvroResult implements RecordSetWriter { private final Schema schema; + private final OutputStream out; - public WriteAvroResult(final Schema schema) { + public WriteAvroResult(final Schema schema, final OutputStream out) { this.schema = schema; + this.out = out; } protected Schema getSchema() { @@ -42,7 +44,7 @@ public abstract class WriteAvroResult implements RecordSetWriter { } @Override - public WriteResult write(final Record record, final OutputStream out) throws IOException { + public WriteResult write(final Record record) throws IOException { final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); @@ -58,12 +60,4 @@ public abstract class WriteAvroResult implements RecordSetWriter { public String getMimeType() { return "application/avro-binary"; } - - public static String normalizeNameForAvro(String inputName) { - String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_"); - if (Character.isDigit(normalizedName.charAt(0))) { - normalizedName = "_" + normalizedName; - } - return normalizedName; - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index 1cfb02d..a3f9cb8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -20,7 +20,7 @@ package org.apache.nifi.avro; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.Collections; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; @@ -29,66 +29,53 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.nifi.schema.access.SchemaAccessWriter; +import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; -public class WriteAvroResultWithExternalSchema extends WriteAvroResult { +public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { private final SchemaAccessWriter schemaAccessWriter; private final RecordSchema recordSchema; - - public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter) { - super(avroSchema); + private final Schema avroSchema; + private final BinaryEncoder encoder; + private final OutputStream buffered; + private final DatumWriter<GenericRecord> datumWriter; + + public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, + final SchemaAccessWriter schemaAccessWriter, final OutputStream out) throws IOException { + super(out); this.recordSchema = recordSchema; this.schemaAccessWriter = schemaAccessWriter; + this.avroSchema = avroSchema; + this.buffered = new BufferedOutputStream(out); + + datumWriter = new GenericDatumWriter<>(avroSchema); + schemaAccessWriter.writeHeader(recordSchema, buffered); + encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null); } @Override - public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException { - Record record = rs.next(); - if (record == null) { - return WriteResult.of(0, Collections.emptyMap()); - } - - int nrOfRows = 0; - final Schema schema = getSchema(); - final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); - - final BufferedOutputStream bufferedOut = new BufferedOutputStream(outStream); - schemaAccessWriter.writeHeader(recordSchema, bufferedOut); - - final BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null); - - do { - final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - - datumWriter.write(rec, encoder); - encoder.flush(); - nrOfRows++; - } while ((record = rs.next()) != null); - - bufferedOut.flush(); - - return WriteResult.of(nrOfRows, schemaAccessWriter.getAttributes(recordSchema)); + protected void onBeginRecordSet() throws IOException { + schemaAccessWriter.writeHeader(recordSchema, buffered); } @Override - public WriteResult write(final Record record, final OutputStream out) throws IOException { - final Schema schema = getSchema(); - final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); - - final BufferedOutputStream bufferedOut = new BufferedOutputStream(out); - schemaAccessWriter.writeHeader(recordSchema, bufferedOut); - - final BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null); - final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - - datumWriter.write(rec, encoder); + protected Map<String, String> onFinishRecordSet() throws IOException { encoder.flush(); + buffered.flush(); + return schemaAccessWriter.getAttributes(recordSchema); + } - bufferedOut.flush(); - + @Override + public WriteResult write(final Record record) throws IOException { + final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema); + datumWriter.write(rec, encoder); return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema)); } + + @Override + public String getMimeType() { + return "application/avro-binary"; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index 73d70d9..9bfb4cf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -25,57 +25,38 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; +import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSet; -public class WriteAvroResultWithSchema extends WriteAvroResult { +public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { - public WriteAvroResultWithSchema(final Schema schema) { - super(schema); - } - - @Override - public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException { - Record record = rs.next(); - if (record == null) { - return WriteResult.of(0, Collections.emptyMap()); - } - - int nrOfRows = 0; - final Schema schema = getSchema(); - final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + private final DataFileWriter<GenericRecord> dataFileWriter; + private final Schema schema; - try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { - dataFileWriter.create(schema, outStream); + public WriteAvroResultWithSchema(final Schema schema, final OutputStream out) throws IOException { + super(out); + this.schema = schema; - do { - final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - dataFileWriter.append(rec); - nrOfRows++; - } while ((record = rs.next()) != null); - } - - return WriteResult.of(nrOfRows, Collections.emptyMap()); + final GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.create(schema, out); } @Override - public WriteResult write(final Record record, final OutputStream out) throws IOException { - if (record == null) { - return WriteResult.of(0, Collections.emptyMap()); - } - - final Schema schema = getSchema(); - final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); - - try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { - dataFileWriter.create(schema, out); - - final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - dataFileWriter.append(rec); - } + public void close() throws IOException { + dataFileWriter.close(); + } + @Override + public WriteResult write(final Record record) throws IOException { + final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); + dataFileWriter.append(rec); return WriteResult.of(1, Collections.emptyMap()); } + + @Override + public String getMimeType() { + return "application/avro-binary"; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index d4f066f..c5e6b19 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -18,6 +18,7 @@ package org.apache.nifi.csv; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -27,6 +28,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; @@ -67,8 +69,8 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) throws SchemaNotFoundException, IOException { - return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException { + return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index f11249b..8475a50 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -20,38 +20,44 @@ package org.apache.nifi.csv; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.util.Collections; +import java.util.Map; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.apache.nifi.schema.access.SchemaAccessWriter; +import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; -import org.apache.nifi.stream.io.NonCloseableOutputStream; -public class WriteCSVResult implements RecordSetWriter { - private final CSVFormat csvFormat; +public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter { private final RecordSchema recordSchema; private final SchemaAccessWriter schemaWriter; private final String dateFormat; private final String timeFormat; private final String timestampFormat; - private final boolean includeHeaderLine; + private final CSVPrinter printer; + private final Object[] fieldValues; - public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, - final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) { - this.csvFormat = csvFormat; + public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out, + final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException { + + super(out); this.recordSchema = recordSchema; this.schemaWriter = schemaWriter; this.dateFormat = dateFormat; this.timeFormat = timeFormat; this.timestampFormat = timestampFormat; - this.includeHeaderLine = includeHeaderLine; + + final String[] columnNames = recordSchema.getFieldNames().toArray(new String[0]); + final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine); + final OutputStreamWriter streamWriter = new OutputStreamWriter(out); + printer = new CSVPrinter(streamWriter, formatWithHeader); + + fieldValues = new Object[recordSchema.getFieldCount()]; } private String getFormat(final RecordField field) { @@ -69,60 +75,29 @@ public class WriteCSVResult implements RecordSetWriter { } @Override - public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { - int count = 0; - - final String[] columnNames = recordSchema.getFieldNames().toArray(new String[0]); - final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine); + protected void onBeginRecordSet() throws IOException { + schemaWriter.writeHeader(recordSchema, getOutputStream()); + } - schemaWriter.writeHeader(recordSchema, rawOut); - - try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut); - final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable); - final CSVPrinter printer = new CSVPrinter(streamWriter, formatWithHeader)) { - - try { - Record record; - while ((record = rs.next()) != null) { - final Object[] colVals = new Object[recordSchema.getFieldCount()]; - int i = 0; - for (final RecordField recordField : recordSchema.getFields()) { - colVals[i++] = record.getAsString(recordField, getFormat(recordField)); - } - - printer.printRecord(colVals); - count++; - } - } catch (final Exception e) { - throw new IOException("Failed to serialize results", e); - } - } + @Override + protected Map<String, String> onFinishRecordSet() throws IOException { + return schemaWriter.getAttributes(recordSchema); + } - return WriteResult.of(count, schemaWriter.getAttributes(recordSchema)); + @Override + public void close() throws IOException { + printer.close(); } @Override - public WriteResult write(final Record record, final OutputStream rawOut) throws IOException { - - try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut); - final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable); - final CSVPrinter printer = new CSVPrinter(streamWriter, csvFormat)) { - - try { - final RecordSchema schema = record.getSchema(); - final Object[] colVals = new Object[schema.getFieldCount()]; - int i = 0; - for (final RecordField recordField : schema.getFields()) { - colVals[i++] = record.getAsString(recordField, getFormat(recordField)); - } - - printer.printRecord(colVals); - } catch (final Exception e) { - throw new IOException("Failed to serialize results", e); - } + public WriteResult write(final Record record) throws IOException { + int i = 0; + for (final RecordField recordField : recordSchema.getFields()) { + fieldValues[i++] = record.getAsString(recordField, getFormat(recordField)); } - return WriteResult.of(1, Collections.emptyMap()); + printer.printRecord(fieldValues); + return WriteResult.of(1, schemaWriter.getAttributes(recordSchema)); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index 91370a7..9a722c1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -18,6 +18,7 @@ package org.apache.nifi.json; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -26,6 +27,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; @@ -62,8 +64,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) throws SchemaNotFoundException, IOException { - return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), prettyPrint, + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException { + return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index bd7dc5e..b73ecab 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -20,13 +20,13 @@ package org.apache.nifi.json; import java.io.IOException; import java.io.OutputStream; import java.math.BigInteger; -import java.sql.SQLException; import java.text.DateFormat; import java.util.Map; import java.util.Optional; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessWriter; +import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; @@ -34,87 +34,76 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.SerializedForm; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; -import org.apache.nifi.stream.io.NonCloseableOutputStream; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; -public class WriteJsonResult implements RecordSetWriter { +public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter { private final ComponentLog logger; - private final boolean prettyPrint; private final SchemaAccessWriter schemaAccess; private final RecordSchema recordSchema; private final JsonFactory factory = new JsonFactory(); private final DateFormat dateFormat; private final DateFormat timeFormat; private final DateFormat timestampFormat; + private final JsonGenerator generator; - public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final boolean prettyPrint, - final String dateFormat, final String timeFormat, final String timestampFormat) { + public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint, + final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { + super(out); this.logger = logger; this.recordSchema = recordSchema; - this.prettyPrint = prettyPrint; this.schemaAccess = schemaAccess; this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); - } - @Override - public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { - int count = 0; + this.generator = factory.createJsonGenerator(out); + if (prettyPrint) { + generator.useDefaultPrettyPrinter(); + } + } - schemaAccess.writeHeader(recordSchema, rawOut); - try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) { - if (prettyPrint) { - generator.useDefaultPrettyPrinter(); - } + @Override + protected void onBeginRecordSet() throws IOException { + final OutputStream out = getOutputStream(); + schemaAccess.writeHeader(recordSchema, out); - generator.writeStartArray(); + generator.writeStartArray(); + } - Record record; - while ((record = rs.next()) != null) { - count++; - writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); - } + @Override + protected Map<String, String> onFinishRecordSet() throws IOException { + generator.writeEndArray(); + return schemaAccess.getAttributes(recordSchema); + } - generator.writeEndArray(); - } catch (final SQLException e) { - throw new IOException("Failed to serialize Result Set to stream", e); + @Override + public void close() throws IOException { + if (generator != null) { + generator.close(); } - return WriteResult.of(count, schemaAccess.getAttributes(recordSchema)); + super.close(); } @Override - public WriteResult write(final Record record, final OutputStream rawOut) throws IOException { - schemaAccess.writeHeader(recordSchema, rawOut); - - try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) { - if (prettyPrint) { - generator.useDefaultPrettyPrinter(); - } - - writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); - } catch (final SQLException e) { - throw new IOException("Failed to write records to stream", e); - } - + public WriteResult write(final Record record) throws IOException { + writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); return WriteResult.of(1, schemaAccess.getAttributes(recordSchema)); } private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask) - throws JsonGenerationException, IOException, SQLException { + throws JsonGenerationException, IOException { final Optional<SerializedForm> serializedForm = record.getSerializedForm(); if (serializedForm.isPresent()) { @@ -155,7 +144,7 @@ public class WriteJsonResult implements RecordSetWriter { @SuppressWarnings("unchecked") private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType, final boolean moreCols) - throws JsonGenerationException, IOException, SQLException { + throws JsonGenerationException, IOException { if (value == null) { generator.writeNull(); return; @@ -268,7 +257,7 @@ public class WriteJsonResult implements RecordSetWriter { } private void writeArray(final Object[] values, final String fieldName, final JsonGenerator generator, final DataType elementType) - throws JsonGenerationException, IOException, SQLException { + throws JsonGenerationException, IOException { generator.writeStartArray(); for (int i = 0; i < values.length; i++) { final boolean moreEntries = i < values.length - 1; http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java index 4fcc3a2..e6a9054 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java @@ -19,6 +19,7 @@ package org.apache.nifi.text; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; @@ -81,8 +82,8 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) { - return new FreeFormTextWriter(textValue, characterSet); + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { + return new FreeFormTextWriter(textValue, characterSet, out); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java index 39416c8..95f2a73 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java @@ -27,42 +27,24 @@ import java.util.List; import java.util.Map; import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; -public class FreeFormTextWriter implements RecordSetWriter { +public class FreeFormTextWriter extends AbstractRecordSetWriter implements RecordSetWriter { private static final byte NEW_LINE = (byte) '\n'; private final PropertyValue propertyValue; private final Charset charset; + private final OutputStream out; - public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet) { - propertyValue = textPropertyValue; - charset = characterSet; - } - - @Override - public WriteResult write(final RecordSet recordSet, final OutputStream out) throws IOException { - int count = 0; - - try { - Record record; - while ((record = recordSet.next()) != null) { - final RecordSchema schema = record.getSchema(); - final List<String> colNames = getColumnNames(schema); - - count++; - write(record, out, colNames); - } - } catch (final Exception e) { - throw new ProcessException(e); - } - - return WriteResult.of(count, Collections.emptyMap()); + public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) { + super(out); + this.propertyValue = textPropertyValue; + this.charset = characterSet; + this.out = out; } private List<String> getColumnNames(final RecordSchema schema) { @@ -78,7 +60,7 @@ public class FreeFormTextWriter implements RecordSetWriter { } @Override - public WriteResult write(final Record record, final OutputStream out) throws IOException { + public WriteResult write(final Record record) throws IOException { write(record, out, getColumnNames(record.getSchema())); return WriteResult.of(1, Collections.emptyMap()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java index 0a84aec..8c20ba7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; @@ -47,6 +48,7 @@ import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Array; import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; @@ -60,7 +62,7 @@ import org.junit.Test; public abstract class TestWriteAvroResult { - protected abstract WriteAvroResult createWriter(Schema schema); + protected abstract RecordSetWriter createWriter(Schema schema, OutputStream out) throws IOException; protected abstract GenericRecord readRecord(InputStream in, Schema schema) throws IOException; @@ -80,7 +82,7 @@ public abstract class TestWriteAvroResult { } private void testLogicalTypes(Schema schema) throws ParseException, IOException { - final WriteAvroResult writer = createWriter(schema); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("timeMillis", RecordFieldType.TIME.getDataType())); @@ -108,12 +110,12 @@ public abstract class TestWriteAvroResult { values.put("decimal", expectedDecimal.doubleValue()); final Record record = new MapRecord(recordSchema, values); - final byte[] data; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - writer.write(RecordSet.of(record.getSchema(), record), baos); - data = baos.toByteArray(); + try (final RecordSetWriter writer = createWriter(schema, baos)) { + writer.write(RecordSet.of(record.getSchema(), record)); } + final byte[] data = baos.toByteArray(); + try (final InputStream in = new ByteArrayInputStream(data)) { final GenericRecord avroRecord = readRecord(in, schema); final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60); @@ -138,7 +140,7 @@ public abstract class TestWriteAvroResult { @Test public void testDataTypes() throws IOException { final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc")); - final WriteAvroResult writer = createWriter(schema); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType())); final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields); @@ -178,13 +180,14 @@ public abstract class TestWriteAvroResult { final Record record = new MapRecord(recordSchema, values); - final byte[] data; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - final WriteResult writeResult = writer.write(RecordSet.of(record.getSchema(), record), baos); - verify(writeResult); - data = baos.toByteArray(); + final WriteResult writeResult; + try (final RecordSetWriter writer = createWriter(schema, baos)) { + writeResult = writer.write(RecordSet.of(record.getSchema(), record)); } + verify(writeResult); + final byte[] data = baos.toByteArray(); + try (final InputStream in = new ByteArrayInputStream(data)) { final GenericRecord avroRecord = readRecord(in, schema); assertMatch(record, avroRecord); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java index 6ace012..9761076 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java @@ -19,6 +19,7 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; @@ -26,12 +27,13 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.StringType; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.RecordSetWriter; public class TestWriteAvroResultWithSchema extends TestWriteAvroResult { @Override - protected WriteAvroResult createWriter(final Schema schema) { - return new WriteAvroResultWithSchema(schema); + protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { + return new WriteAvroResultWithSchema(schema, out); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java index d40bb55..c25f0aa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java @@ -19,6 +19,7 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Map; import org.apache.avro.Schema; @@ -27,14 +28,15 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.nifi.schema.access.SchemaTextAsAttribute; +import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.junit.Assert; public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { @Override - protected WriteAvroResult createWriter(final Schema schema) { - return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute()); + protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { + return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java index 9424e79..c447664 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java @@ -71,37 +71,38 @@ public class TestWriteCSVResult { } final RecordSchema schema = new SimpleRecordSchema(fields); - final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true); - + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final long now = System.currentTimeMillis(); - final Map<String, Object> valueMap = new HashMap<>(); - valueMap.put("string", "string"); - valueMap.put("boolean", true); - valueMap.put("byte", (byte) 1); - valueMap.put("char", 'c'); - valueMap.put("short", (short) 8); - valueMap.put("int", 9); - valueMap.put("bigint", BigInteger.valueOf(8L)); - valueMap.put("long", 8L); - valueMap.put("float", 8.0F); - valueMap.put("double", 8.0D); - valueMap.put("date", new Date(now)); - valueMap.put("time", new Time(now)); - valueMap.put("timestamp", new Timestamp(now)); - valueMap.put("record", null); - valueMap.put("choice", 48L); - valueMap.put("array", null); - - final Record record = new MapRecord(schema, valueMap); - final RecordSet rs = RecordSet.of(schema, record); - - final String output; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - result.write(rs, baos); - output = new String(baos.toByteArray(), StandardCharsets.UTF_8); + + try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + + final Map<String, Object> valueMap = new HashMap<>(); + valueMap.put("string", "string"); + valueMap.put("boolean", true); + valueMap.put("byte", (byte) 1); + valueMap.put("char", 'c'); + valueMap.put("short", (short) 8); + valueMap.put("int", 9); + valueMap.put("bigint", BigInteger.valueOf(8L)); + valueMap.put("long", 8L); + valueMap.put("float", 8.0F); + valueMap.put("double", 8.0D); + valueMap.put("date", new Date(now)); + valueMap.put("time", new Time(now)); + valueMap.put("timestamp", new Timestamp(now)); + valueMap.put("record", null); + valueMap.put("choice", 48L); + valueMap.put("array", null); + + final Record record = new MapRecord(schema, valueMap); + final RecordSet rs = RecordSet.of(schema, record); + + result.write(rs); } + final String output = new String(baos.toByteArray(), StandardCharsets.UTF_8); + headerBuilder.deleteCharAt(headerBuilder.length() - 1); final String headerLine = headerBuilder.toString(); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java index 1acc496..16d2012 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java @@ -72,9 +72,7 @@ public class TestWriteJsonResult { } final RecordSchema schema = new SimpleRecordSchema(fields); - final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(), - RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); - + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); df.setTimeZone(TimeZone.getTimeZone("gmt")); final long time = df.parse("2017/01/01 17:00:00.000").getTime(); @@ -105,12 +103,14 @@ public class TestWriteJsonResult { final Record record = new MapRecord(schema, valueMap); final RecordSet rs = RecordSet.of(schema, record); - final String output; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - writer.write(rs, baos); - output = baos.toString(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(), + RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + + writer.write(rs); } + final String output = baos.toString(); + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/json/output/dataTypes.json"))); assertEquals(expected, output); } @@ -139,15 +139,15 @@ public class TestWriteJsonResult { final RecordSet rs = RecordSet.of(schema, record1, record2); - final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(), - RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(), + RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { - final byte[] data; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - writer.write(rs, baos); - data = baos.toByteArray(); + writer.write(rs); } + final byte[] data = baos.toByteArray(); + final String expected = "[ " + serialized1 + ", " + serialized2 + " ]"; final String output = new String(data, StandardCharsets.UTF_8); @@ -171,14 +171,13 @@ public class TestWriteJsonResult { final Record record = new MapRecord(schema, values); final RecordSet rs = RecordSet.of(schema, record); - final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), false, null, null, null); - - final byte[] data; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - writer.write(rs, baos); - data = baos.toByteArray(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) { + writer.write(rs); } + final byte[] data = baos.toByteArray(); + final String expected = "[{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}]"; final String output = new String(data, StandardCharsets.UTF_8); http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml index ba1b490..3105d8d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml @@ -68,6 +68,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-lookup-service-api</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-record</artifactId> <scope>compile</scope> </dependency>
